Skip to content

Commit

Permalink
fix: Unable to run vebbench and cli
Browse files Browse the repository at this point in the history
fix: remove comma of logging str
fix cli unable to run zilliztech#444

Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn committed Jan 10, 2025
1 parent 6ee3dbe commit b1bdb44
Show file tree
Hide file tree
Showing 15 changed files with 88 additions and 151 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ lint.ignore = [
"RUF017",
"C416",
"PLW0603",
"COM812",
]

# Allow autofix for all enabled rules (when `--fix`) is provided.
Expand Down
14 changes: 12 additions & 2 deletions vectordb_bench/backend/clients/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class DB(Enum):
AliyunOpenSearch = "AliyunOpenSearch"

@property
def init_cls(self) -> type[VectorDB]: # noqa: PLR0911, PLR0912
def init_cls(self) -> type[VectorDB]: # noqa: PLR0911, PLR0912, C901
"""Import while in use"""
if self == DB.Milvus:
from .milvus.milvus import Milvus
Expand Down Expand Up @@ -129,11 +129,16 @@ def init_cls(self) -> type[VectorDB]: # noqa: PLR0911, PLR0912

return AliyunOpenSearch

if self == DB.Test:
from .test.test import Test

return Test

msg = f"Unknown DB: {self.name}"
raise ValueError(msg)

@property
def config_cls(self) -> type[DBConfig]: # noqa: PLR0911, PLR0912
def config_cls(self) -> type[DBConfig]: # noqa: PLR0911, PLR0912, C901
"""Import while in use"""
if self == DB.Milvus:
from .milvus.config import MilvusConfig
Expand Down Expand Up @@ -220,6 +225,11 @@ def config_cls(self) -> type[DBConfig]: # noqa: PLR0911, PLR0912

return AliyunOpenSearchConfig

if self == DB.Test:
from .test.config import TestConfig

return TestConfig

msg = f"Unknown DB: {self.name}"
raise ValueError(msg)

Expand Down
4 changes: 2 additions & 2 deletions vectordb_bench/backend/clients/memorydb/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ class MemoryDBTypedDict(TypedDict):
show_default=True,
default=False,
help=(
"Cluster Mode Disabled (CMD), use this flag when testing locally on a single node instance.",
" In production, MemoryDB only supports cluster mode (CME)",
"Cluster Mode Disabled (CMD), use this flag when testing locally on a single node instance."
" In production, MemoryDB only supports cluster mode (CME)"
),
),
]
Expand Down
9 changes: 2 additions & 7 deletions vectordb_bench/backend/clients/pgvecto_rs/pgvecto_rs.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,7 @@ def _create_index(self):
self.cursor.execute(index_create_sql)
self.conn.commit()
except Exception as e:
log.warning(
f"Failed to create pgvecto.rs index {self._index_name} \
at table {self.table_name} error: {e}",
)
log.warning(f"Failed to create pgvecto.rs index {self._index_name} at table {self.table_name} error: {e}")
raise e from None

def _create_table(self, dim: int):
Expand Down Expand Up @@ -258,9 +255,7 @@ def insert_embeddings(

return len(metadata), None
except Exception as e:
log.warning(
f"Failed to insert data into pgvecto.rs table ({self.table_name}), error: {e}",
)
log.warning(f"Failed to insert data into pgvecto.rs table ({self.table_name}), error: {e}")
return 0, e

def search_embedding(
Expand Down
4 changes: 1 addition & 3 deletions vectordb_bench/backend/clients/pgvector/pgvector.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,9 +415,7 @@ def insert_embeddings(

return len(metadata), None
except Exception as e:
log.warning(
f"Failed to insert data into pgvector table ({self.table_name}), error: {e}",
)
log.warning(f"Failed to insert data into pgvector table ({self.table_name}), error: {e}")
return 0, e

def search_embedding(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,7 @@ def insert_embeddings(

return len(metadata), None
except Exception as e:
log.warning(
f"Failed to insert data into pgvector table ({self.table_name}), error: {e}",
)
log.warning(f"Failed to insert data into pgvector table ({self.table_name}), error: {e}")
return 0, e

def search_embedding(
Expand Down
4 changes: 2 additions & 2 deletions vectordb_bench/backend/clients/qdrant_cloud/qdrant_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ def optimize(self):
continue
if info.status == CollectionStatus.GREEN:
msg = (
f"Stored vectors: {info.vectors_count}, Indexed vectors: {info.indexed_vectors_count}, ",
f"Collection status: {info.indexed_vectors_count}",
f"Stored vectors: {info.vectors_count}, Indexed vectors: {info.indexed_vectors_count}, "
f"Collection status: {info.indexed_vectors_count}"
)
log.info(msg)
return
Expand Down
2 changes: 1 addition & 1 deletion vectordb_bench/backend/clients/test/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class TestTypedDict(CommonTypedDict): ...
@click_parameter_decorators_from_typed_dict(TestTypedDict)
def Test(**parameters: Unpack[TestTypedDict]):
run(
db=DB.NewClient,
db=DB.Test,
db_config=TestConfig(db_label=parameters["db_label"]),
db_case_config=TestIndexConfig(),
**parameters,
Expand Down
16 changes: 4 additions & 12 deletions vectordb_bench/backend/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,7 @@ def validate_file(self, remote: pathlib.Path, local: pathlib.Path) -> bool:
# check size equal
remote_size, local_size = info.content_length, local.stat().st_size
if remote_size != local_size:
log.info(
f"local file: {local} size[{local_size}] not match with remote size[{remote_size}]",
)
log.info(f"local file: {local} size[{local_size}] not match with remote size[{remote_size}]")
return False

return True
Expand All @@ -89,9 +87,7 @@ def read(self, dataset: str, files: list[str], local_ds_root: pathlib.Path):
local_file = local_ds_root.joinpath(file)

if (not local_file.exists()) or (not self.validate_file(remote_file, local_file)):
log.info(
f"local file: {local_file} not match with remote: {remote_file}; add to downloading list",
)
log.info(f"local file: {local_file} not match with remote: {remote_file}; add to downloading list")
downloads.append((remote_file, local_file))

if len(downloads) == 0:
Expand Down Expand Up @@ -135,9 +131,7 @@ def read(self, dataset: str, files: list[str], local_ds_root: pathlib.Path):
local_file = local_ds_root.joinpath(file)

if (not local_file.exists()) or (not self.validate_file(remote_file, local_file)):
log.info(
f"local file: {local_file} not match with remote: {remote_file}; add to downloading list",
)
log.info(f"local file: {local_file} not match with remote: {remote_file}; add to downloading list")
downloads.append(remote_file)

if len(downloads) == 0:
Expand All @@ -157,9 +151,7 @@ def validate_file(self, remote: pathlib.Path, local: pathlib.Path) -> bool:
# check size equal
remote_size, local_size = info.get("size"), local.stat().st_size
if remote_size != local_size:
log.info(
f"local file: {local} size[{local_size}] not match with remote size[{remote_size}]",
)
log.info(f"local file: {local} size[{local_size}] not match with remote size[{remote_size}]")
return False

return True
50 changes: 16 additions & 34 deletions vectordb_bench/backend/runner/mp_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,24 +79,22 @@ def search(

if count % 500 == 0:
log.debug(
f"({mp.current_process().name:16}) ",
f"search_count: {count}, latest_latency={time.perf_counter()-s}",
f"({mp.current_process().name:16}) "
f"search_count: {count}, latest_latency={time.perf_counter()-s}"
)

total_dur = round(time.perf_counter() - start_time, 4)
log.info(
f"{mp.current_process().name:16} search {self.duration}s: "
f"actual_dur={total_dur}s, count={count}, qps in this process: {round(count / total_dur, 4):3}",
f"actual_dur={total_dur}s, count={count}, qps in this process: {round(count / total_dur, 4):3}"
)

return (count, total_dur, latencies)

@staticmethod
def get_mp_context():
mp_start_method = "spawn"
log.debug(
f"MultiProcessingSearchRunner get multiprocessing start method: {mp_start_method}",
)
log.debug(f"MultiProcessingSearchRunner get multiprocessing start method: {mp_start_method}")
return mp.get_context(mp_start_method)

def _run_all_concurrencies_mem_efficient(self):
Expand All @@ -113,9 +111,7 @@ def _run_all_concurrencies_mem_efficient(self):
mp_context=self.get_mp_context(),
max_workers=conc,
) as executor:
log.info(
f"Start search {self.duration}s in concurrency {conc}, filters: {self.filters}",
)
log.info(f"Start search {self.duration}s in concurrency {conc}, filters: {self.filters}")
future_iter = [executor.submit(self.search, self.test_data, q, cond) for i in range(conc)]
# Sync all processes
while q.qsize() < conc:
Expand All @@ -124,9 +120,7 @@ def _run_all_concurrencies_mem_efficient(self):

with cond:
cond.notify_all()
log.info(
f"Syncing all process and start concurrency search, concurrency={conc}",
)
log.info(f"Syncing all process and start concurrency search, concurrency={conc}")

start = time.perf_counter()
all_count = sum([r.result()[0] for r in future_iter])
Expand All @@ -140,18 +134,14 @@ def _run_all_concurrencies_mem_efficient(self):
conc_qps_list.append(qps)
conc_latency_p99_list.append(latency_p99)
conc_latency_avg_list.append(latency_avg)
log.info(
f"End search in concurrency {conc}: dur={cost}s, total_count={all_count}, qps={qps}",
)
log.info(f"End search in concurrency {conc}: dur={cost}s, total_count={all_count}, qps={qps}")

if qps > max_qps:
max_qps = qps
log.info(
f"Update largest qps with concurrency {conc}: current max_qps={max_qps}",
)
log.info(f"Update largest qps with concurrency {conc}: current max_qps={max_qps}")
except Exception as e:
log.warning(
f"Fail to search all concurrencies: {self.concurrencies}, max_qps before failure={max_qps}, reason={e}",
f"Fail to search, concurrencies: {self.concurrencies}, max_qps before failure={max_qps}, reason={e}"
)
traceback.print_exc()

Expand Down Expand Up @@ -193,9 +183,7 @@ def _run_by_dur(self, duration: int) -> float:
mp_context=self.get_mp_context(),
max_workers=conc,
) as executor:
log.info(
f"Start search_by_dur {duration}s in concurrency {conc}, filters: {self.filters}",
)
log.info(f"Start search_by_dur {duration}s in concurrency {conc}, filters: {self.filters}")
future_iter = [
executor.submit(self.search_by_dur, duration, self.test_data, q, cond) for i in range(conc)
]
Expand All @@ -206,24 +194,18 @@ def _run_by_dur(self, duration: int) -> float:

with cond:
cond.notify_all()
log.info(
f"Syncing all process and start concurrency search, concurrency={conc}",
)
log.info(f"Syncing all process and start concurrency search, concurrency={conc}")

start = time.perf_counter()
all_count = sum([r.result() for r in future_iter])
cost = time.perf_counter() - start

qps = round(all_count / cost, 4)
log.info(
f"End search in concurrency {conc}: dur={cost}s, total_count={all_count}, qps={qps}",
)
log.info(f"End search in concurrency {conc}: dur={cost}s, total_count={all_count}, qps={qps}")

if qps > max_qps:
max_qps = qps
log.info(
f"Update largest qps with concurrency {conc}: current max_qps={max_qps}",
)
log.info(f"Update largest qps with concurrency {conc}: current max_qps={max_qps}")
except Exception as e:
log.warning(
f"Fail to search all concurrencies: {self.concurrencies}, max_qps before failure={max_qps}, reason={e}",
Expand Down Expand Up @@ -275,14 +257,14 @@ def search_by_dur(

if count % 500 == 0:
log.debug(
f"({mp.current_process().name:16}) search_count: {count}, ",
f"latest_latency={time.perf_counter()-s}",
f"({mp.current_process().name:16}) search_count: {count}, "
f"latest_latency={time.perf_counter()-s}"
)

total_dur = round(time.perf_counter() - start_time, 4)
log.debug(
f"{mp.current_process().name:16} search {self.duration}s: "
f"actual_dur={total_dur}s, count={count}, qps in this process: {round(count / total_dur, 4):3}",
f"actual_dur={total_dur}s, count={count}, qps in this process: {round(count / total_dur, 4):3}"
)

return count
8 changes: 4 additions & 4 deletions vectordb_bench/backend/runner/rate_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,14 @@ def submit_by_rate() -> bool:

if len(not_done) > 0:
log.warning(
f"Failed to finish all tasks in 1s, [{len(not_done)}/{len(executing_futures)}] ",
f"tasks are not done, waited={wait_interval:.2f}, trying to wait in the next round",
f"Failed to finish all tasks in 1s, [{len(not_done)}/{len(executing_futures)}] "
f"tasks are not done, waited={wait_interval:.2f}, trying to wait in the next round"
)
executing_futures = list(not_done)
else:
log.debug(
f"Finished {len(executing_futures)} insert-{config.NUM_PER_BATCH} ",
f"task in 1s, wait_interval={wait_interval:.2f}",
f"Finished {len(executing_futures)} insert-{config.NUM_PER_BATCH} "
f"task in 1s, wait_interval={wait_interval:.2f}"
)
executing_futures = []
except Exception as e:
Expand Down
24 changes: 10 additions & 14 deletions vectordb_bench/backend/runner/read_write_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ def __init__(
self.read_dur_after_write = read_dur_after_write

log.info(
f"Init runner, concurencys={concurrencies}, search_stage={search_stage}, ",
f"stage_search_dur={read_dur_after_write}",
f"Init runner, concurencys={concurrencies}, search_stage={search_stage}, "
f"stage_search_dur={read_dur_after_write}"
)

test_emb = np.stack(dataset.test_data["emb"])
Expand Down Expand Up @@ -88,12 +88,10 @@ def run_search(self):
res, ssearch_dur = self.serial_search_runner.run()
recall, ndcg, p99_latency = res
log.info(
f"Search after write - Serial search - recall={recall}, ndcg={ndcg}, p99={p99_latency}, ",
f"Search after write - Serial search - recall={recall}, ndcg={ndcg}, p99={p99_latency}, "
f"dur={ssearch_dur:.4f}",
)
log.info(
f"Search after wirte - Conc search start, dur for each conc={self.read_dur_after_write}",
)
log.info(f"Search after wirte - Conc search start, dur for each conc={self.read_dur_after_write}")
max_qps = self.run_by_dur(self.read_dur_after_write)
log.info(f"Search after wirte - Conc search finished, max_qps={max_qps}")

Expand Down Expand Up @@ -157,18 +155,16 @@ def wait_next_target(start: int, target_batch: int) -> bool:

got = wait_next_target(start_batch, target_batch)
if got is False:
log.warning(
f"Abnormal exit, target_batch={target_batch}, start_batch={start_batch}",
)
log.warning(f"Abnormal exit, target_batch={target_batch}, start_batch={start_batch}")
return None

log.info(f"Insert {perc}% done, total batch={total_batch}")
log.info(f"[{target_batch}/{total_batch}] Serial search - {perc}% start")
res, ssearch_dur = self.serial_search_runner.run()
recall, ndcg, p99_latency = res
log.info(
f"[{target_batch}/{total_batch}] Serial search - {perc}% done, recall={recall}, ",
f"ndcg={ndcg}, p99={p99_latency}, dur={ssearch_dur:.4f}",
f"[{target_batch}/{total_batch}] Serial search - {perc}% done, recall={recall}, "
f"ndcg={ndcg}, p99={p99_latency}, dur={ssearch_dur:.4f}"
)

# Search duration for non-last search stage is carefully calculated.
Expand All @@ -183,8 +179,8 @@ def wait_next_target(start: int, target_batch: int) -> bool:
each_conc_search_dur = csearch_dur / len(self.concurrencies)
if each_conc_search_dur < 30:
warning_msg = (
f"Results might be inaccurate, duration[{csearch_dur:.4f}] left for conc-search is too short, ",
f"total available dur={total_dur_between_stages}, serial_search_cost={ssearch_dur}.",
f"Results might be inaccurate, duration[{csearch_dur:.4f}] left for conc-search is too short, "
f"total available dur={total_dur_between_stages}, serial_search_cost={ssearch_dur}."
)
log.warning(warning_msg)

Expand All @@ -193,7 +189,7 @@ def wait_next_target(start: int, target_batch: int) -> bool:
each_conc_search_dur = 60

log.info(
f"[{target_batch}/{total_batch}] Concurrent search - {perc}% start, dur={each_conc_search_dur:.4f}",
f"[{target_batch}/{total_batch}] Concurrent search - {perc}% start, dur={each_conc_search_dur:.4f}"
)
max_qps = self.run_by_dur(each_conc_search_dur)
result.append((perc, max_qps, recall, ndcg, p99_latency))
Expand Down
Loading

0 comments on commit b1bdb44

Please sign in to comment.