diff --git a/pyproject.toml b/pyproject.toml index 31294063..6259bcea 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -133,6 +133,7 @@ lint.ignore = [ "RUF017", "C416", "PLW0603", + "COM812", ] # Allow autofix for all enabled rules (when `--fix`) is provided. diff --git a/vectordb_bench/backend/clients/__init__.py b/vectordb_bench/backend/clients/__init__.py index 773cd494..e796aa06 100644 --- a/vectordb_bench/backend/clients/__init__.py +++ b/vectordb_bench/backend/clients/__init__.py @@ -42,7 +42,7 @@ class DB(Enum): AliyunOpenSearch = "AliyunOpenSearch" @property - def init_cls(self) -> type[VectorDB]: # noqa: PLR0911, PLR0912 + def init_cls(self) -> type[VectorDB]: # noqa: PLR0911, PLR0912, C901 """Import while in use""" if self == DB.Milvus: from .milvus.milvus import Milvus @@ -129,11 +129,16 @@ def init_cls(self) -> type[VectorDB]: # noqa: PLR0911, PLR0912 return AliyunOpenSearch + if self == DB.Test: + from .test.test import Test + + return Test + msg = f"Unknown DB: {self.name}" raise ValueError(msg) @property - def config_cls(self) -> type[DBConfig]: # noqa: PLR0911, PLR0912 + def config_cls(self) -> type[DBConfig]: # noqa: PLR0911, PLR0912, C901 """Import while in use""" if self == DB.Milvus: from .milvus.config import MilvusConfig @@ -220,6 +225,11 @@ def config_cls(self) -> type[DBConfig]: # noqa: PLR0911, PLR0912 return AliyunOpenSearchConfig + if self == DB.Test: + from .test.config import TestConfig + + return TestConfig + msg = f"Unknown DB: {self.name}" raise ValueError(msg) diff --git a/vectordb_bench/backend/clients/memorydb/cli.py b/vectordb_bench/backend/clients/memorydb/cli.py index ae00bfd1..568eec2a 100644 --- a/vectordb_bench/backend/clients/memorydb/cli.py +++ b/vectordb_bench/backend/clients/memorydb/cli.py @@ -43,8 +43,8 @@ class MemoryDBTypedDict(TypedDict): show_default=True, default=False, help=( - "Cluster Mode Disabled (CMD), use this flag when testing locally on a single node instance.", - " In production, MemoryDB only supports cluster mode (CME)", + "Cluster Mode Disabled (CMD), use this flag when testing locally on a single node instance." + " In production, MemoryDB only supports cluster mode (CME)" ), ), ] diff --git a/vectordb_bench/backend/clients/pgvecto_rs/pgvecto_rs.py b/vectordb_bench/backend/clients/pgvecto_rs/pgvecto_rs.py index fc4f1780..64e95a1b 100644 --- a/vectordb_bench/backend/clients/pgvecto_rs/pgvecto_rs.py +++ b/vectordb_bench/backend/clients/pgvecto_rs/pgvecto_rs.py @@ -200,10 +200,7 @@ def _create_index(self): self.cursor.execute(index_create_sql) self.conn.commit() except Exception as e: - log.warning( - f"Failed to create pgvecto.rs index {self._index_name} \ - at table {self.table_name} error: {e}", - ) + log.warning(f"Failed to create pgvecto.rs index {self._index_name} at table {self.table_name} error: {e}") raise e from None def _create_table(self, dim: int): @@ -258,9 +255,7 @@ def insert_embeddings( return len(metadata), None except Exception as e: - log.warning( - f"Failed to insert data into pgvecto.rs table ({self.table_name}), error: {e}", - ) + log.warning(f"Failed to insert data into pgvecto.rs table ({self.table_name}), error: {e}") return 0, e def search_embedding( diff --git a/vectordb_bench/backend/clients/pgvector/pgvector.py b/vectordb_bench/backend/clients/pgvector/pgvector.py index 62a7971b..bd024175 100644 --- a/vectordb_bench/backend/clients/pgvector/pgvector.py +++ b/vectordb_bench/backend/clients/pgvector/pgvector.py @@ -415,9 +415,7 @@ def insert_embeddings( return len(metadata), None except Exception as e: - log.warning( - f"Failed to insert data into pgvector table ({self.table_name}), error: {e}", - ) + log.warning(f"Failed to insert data into pgvector table ({self.table_name}), error: {e}") return 0, e def search_embedding( diff --git a/vectordb_bench/backend/clients/pgvectorscale/pgvectorscale.py b/vectordb_bench/backend/clients/pgvectorscale/pgvectorscale.py index 981accc2..ca7d809b 100644 --- a/vectordb_bench/backend/clients/pgvectorscale/pgvectorscale.py +++ b/vectordb_bench/backend/clients/pgvectorscale/pgvectorscale.py @@ -255,9 +255,7 @@ def insert_embeddings( return len(metadata), None except Exception as e: - log.warning( - f"Failed to insert data into pgvector table ({self.table_name}), error: {e}", - ) + log.warning(f"Failed to insert data into pgvector table ({self.table_name}), error: {e}") return 0, e def search_embedding( diff --git a/vectordb_bench/backend/clients/qdrant_cloud/qdrant_cloud.py b/vectordb_bench/backend/clients/qdrant_cloud/qdrant_cloud.py index 0861e893..a0d146a7 100644 --- a/vectordb_bench/backend/clients/qdrant_cloud/qdrant_cloud.py +++ b/vectordb_bench/backend/clients/qdrant_cloud/qdrant_cloud.py @@ -76,8 +76,8 @@ def optimize(self): continue if info.status == CollectionStatus.GREEN: msg = ( - f"Stored vectors: {info.vectors_count}, Indexed vectors: {info.indexed_vectors_count}, ", - f"Collection status: {info.indexed_vectors_count}", + f"Stored vectors: {info.vectors_count}, Indexed vectors: {info.indexed_vectors_count}, " + f"Collection status: {info.indexed_vectors_count}" ) log.info(msg) return diff --git a/vectordb_bench/backend/clients/test/cli.py b/vectordb_bench/backend/clients/test/cli.py index e5cd4c78..2dcc4c40 100644 --- a/vectordb_bench/backend/clients/test/cli.py +++ b/vectordb_bench/backend/clients/test/cli.py @@ -17,7 +17,7 @@ class TestTypedDict(CommonTypedDict): ... @click_parameter_decorators_from_typed_dict(TestTypedDict) def Test(**parameters: Unpack[TestTypedDict]): run( - db=DB.NewClient, + db=DB.Test, db_config=TestConfig(db_label=parameters["db_label"]), db_case_config=TestIndexConfig(), **parameters, diff --git a/vectordb_bench/backend/data_source.py b/vectordb_bench/backend/data_source.py index b98dc7d7..139d2e30 100644 --- a/vectordb_bench/backend/data_source.py +++ b/vectordb_bench/backend/data_source.py @@ -63,9 +63,7 @@ def validate_file(self, remote: pathlib.Path, local: pathlib.Path) -> bool: # check size equal remote_size, local_size = info.content_length, local.stat().st_size if remote_size != local_size: - log.info( - f"local file: {local} size[{local_size}] not match with remote size[{remote_size}]", - ) + log.info(f"local file: {local} size[{local_size}] not match with remote size[{remote_size}]") return False return True @@ -89,9 +87,7 @@ def read(self, dataset: str, files: list[str], local_ds_root: pathlib.Path): local_file = local_ds_root.joinpath(file) if (not local_file.exists()) or (not self.validate_file(remote_file, local_file)): - log.info( - f"local file: {local_file} not match with remote: {remote_file}; add to downloading list", - ) + log.info(f"local file: {local_file} not match with remote: {remote_file}; add to downloading list") downloads.append((remote_file, local_file)) if len(downloads) == 0: @@ -135,9 +131,7 @@ def read(self, dataset: str, files: list[str], local_ds_root: pathlib.Path): local_file = local_ds_root.joinpath(file) if (not local_file.exists()) or (not self.validate_file(remote_file, local_file)): - log.info( - f"local file: {local_file} not match with remote: {remote_file}; add to downloading list", - ) + log.info(f"local file: {local_file} not match with remote: {remote_file}; add to downloading list") downloads.append(remote_file) if len(downloads) == 0: @@ -157,9 +151,7 @@ def validate_file(self, remote: pathlib.Path, local: pathlib.Path) -> bool: # check size equal remote_size, local_size = info.get("size"), local.stat().st_size if remote_size != local_size: - log.info( - f"local file: {local} size[{local_size}] not match with remote size[{remote_size}]", - ) + log.info(f"local file: {local} size[{local_size}] not match with remote size[{remote_size}]") return False return True diff --git a/vectordb_bench/backend/runner/mp_runner.py b/vectordb_bench/backend/runner/mp_runner.py index 5b69b548..687a0ecd 100644 --- a/vectordb_bench/backend/runner/mp_runner.py +++ b/vectordb_bench/backend/runner/mp_runner.py @@ -79,14 +79,14 @@ def search( if count % 500 == 0: log.debug( - f"({mp.current_process().name:16}) ", - f"search_count: {count}, latest_latency={time.perf_counter()-s}", + f"({mp.current_process().name:16}) " + f"search_count: {count}, latest_latency={time.perf_counter()-s}" ) total_dur = round(time.perf_counter() - start_time, 4) log.info( f"{mp.current_process().name:16} search {self.duration}s: " - f"actual_dur={total_dur}s, count={count}, qps in this process: {round(count / total_dur, 4):3}", + f"actual_dur={total_dur}s, count={count}, qps in this process: {round(count / total_dur, 4):3}" ) return (count, total_dur, latencies) @@ -94,9 +94,7 @@ def search( @staticmethod def get_mp_context(): mp_start_method = "spawn" - log.debug( - f"MultiProcessingSearchRunner get multiprocessing start method: {mp_start_method}", - ) + log.debug(f"MultiProcessingSearchRunner get multiprocessing start method: {mp_start_method}") return mp.get_context(mp_start_method) def _run_all_concurrencies_mem_efficient(self): @@ -113,9 +111,7 @@ def _run_all_concurrencies_mem_efficient(self): mp_context=self.get_mp_context(), max_workers=conc, ) as executor: - log.info( - f"Start search {self.duration}s in concurrency {conc}, filters: {self.filters}", - ) + log.info(f"Start search {self.duration}s in concurrency {conc}, filters: {self.filters}") future_iter = [executor.submit(self.search, self.test_data, q, cond) for i in range(conc)] # Sync all processes while q.qsize() < conc: @@ -124,9 +120,7 @@ def _run_all_concurrencies_mem_efficient(self): with cond: cond.notify_all() - log.info( - f"Syncing all process and start concurrency search, concurrency={conc}", - ) + log.info(f"Syncing all process and start concurrency search, concurrency={conc}") start = time.perf_counter() all_count = sum([r.result()[0] for r in future_iter]) @@ -140,18 +134,14 @@ def _run_all_concurrencies_mem_efficient(self): conc_qps_list.append(qps) conc_latency_p99_list.append(latency_p99) conc_latency_avg_list.append(latency_avg) - log.info( - f"End search in concurrency {conc}: dur={cost}s, total_count={all_count}, qps={qps}", - ) + log.info(f"End search in concurrency {conc}: dur={cost}s, total_count={all_count}, qps={qps}") if qps > max_qps: max_qps = qps - log.info( - f"Update largest qps with concurrency {conc}: current max_qps={max_qps}", - ) + log.info(f"Update largest qps with concurrency {conc}: current max_qps={max_qps}") except Exception as e: log.warning( - f"Fail to search all concurrencies: {self.concurrencies}, max_qps before failure={max_qps}, reason={e}", + f"Fail to search, concurrencies: {self.concurrencies}, max_qps before failure={max_qps}, reason={e}" ) traceback.print_exc() @@ -193,9 +183,7 @@ def _run_by_dur(self, duration: int) -> float: mp_context=self.get_mp_context(), max_workers=conc, ) as executor: - log.info( - f"Start search_by_dur {duration}s in concurrency {conc}, filters: {self.filters}", - ) + log.info(f"Start search_by_dur {duration}s in concurrency {conc}, filters: {self.filters}") future_iter = [ executor.submit(self.search_by_dur, duration, self.test_data, q, cond) for i in range(conc) ] @@ -206,24 +194,18 @@ def _run_by_dur(self, duration: int) -> float: with cond: cond.notify_all() - log.info( - f"Syncing all process and start concurrency search, concurrency={conc}", - ) + log.info(f"Syncing all process and start concurrency search, concurrency={conc}") start = time.perf_counter() all_count = sum([r.result() for r in future_iter]) cost = time.perf_counter() - start qps = round(all_count / cost, 4) - log.info( - f"End search in concurrency {conc}: dur={cost}s, total_count={all_count}, qps={qps}", - ) + log.info(f"End search in concurrency {conc}: dur={cost}s, total_count={all_count}, qps={qps}") if qps > max_qps: max_qps = qps - log.info( - f"Update largest qps with concurrency {conc}: current max_qps={max_qps}", - ) + log.info(f"Update largest qps with concurrency {conc}: current max_qps={max_qps}") except Exception as e: log.warning( f"Fail to search all concurrencies: {self.concurrencies}, max_qps before failure={max_qps}, reason={e}", @@ -275,14 +257,14 @@ def search_by_dur( if count % 500 == 0: log.debug( - f"({mp.current_process().name:16}) search_count: {count}, ", - f"latest_latency={time.perf_counter()-s}", + f"({mp.current_process().name:16}) search_count: {count}, " + f"latest_latency={time.perf_counter()-s}" ) total_dur = round(time.perf_counter() - start_time, 4) log.debug( f"{mp.current_process().name:16} search {self.duration}s: " - f"actual_dur={total_dur}s, count={count}, qps in this process: {round(count / total_dur, 4):3}", + f"actual_dur={total_dur}s, count={count}, qps in this process: {round(count / total_dur, 4):3}" ) return count diff --git a/vectordb_bench/backend/runner/rate_runner.py b/vectordb_bench/backend/runner/rate_runner.py index 0145af4c..4b32bcd9 100644 --- a/vectordb_bench/backend/runner/rate_runner.py +++ b/vectordb_bench/backend/runner/rate_runner.py @@ -73,14 +73,14 @@ def submit_by_rate() -> bool: if len(not_done) > 0: log.warning( - f"Failed to finish all tasks in 1s, [{len(not_done)}/{len(executing_futures)}] ", - f"tasks are not done, waited={wait_interval:.2f}, trying to wait in the next round", + f"Failed to finish all tasks in 1s, [{len(not_done)}/{len(executing_futures)}] " + f"tasks are not done, waited={wait_interval:.2f}, trying to wait in the next round" ) executing_futures = list(not_done) else: log.debug( - f"Finished {len(executing_futures)} insert-{config.NUM_PER_BATCH} ", - f"task in 1s, wait_interval={wait_interval:.2f}", + f"Finished {len(executing_futures)} insert-{config.NUM_PER_BATCH} " + f"task in 1s, wait_interval={wait_interval:.2f}" ) executing_futures = [] except Exception as e: diff --git a/vectordb_bench/backend/runner/read_write_runner.py b/vectordb_bench/backend/runner/read_write_runner.py index e916f45d..d7584459 100644 --- a/vectordb_bench/backend/runner/read_write_runner.py +++ b/vectordb_bench/backend/runner/read_write_runner.py @@ -45,8 +45,8 @@ def __init__( self.read_dur_after_write = read_dur_after_write log.info( - f"Init runner, concurencys={concurrencies}, search_stage={search_stage}, ", - f"stage_search_dur={read_dur_after_write}", + f"Init runner, concurencys={concurrencies}, search_stage={search_stage}, " + f"stage_search_dur={read_dur_after_write}" ) test_emb = np.stack(dataset.test_data["emb"]) @@ -88,12 +88,10 @@ def run_search(self): res, ssearch_dur = self.serial_search_runner.run() recall, ndcg, p99_latency = res log.info( - f"Search after write - Serial search - recall={recall}, ndcg={ndcg}, p99={p99_latency}, ", + f"Search after write - Serial search - recall={recall}, ndcg={ndcg}, p99={p99_latency}, " f"dur={ssearch_dur:.4f}", ) - log.info( - f"Search after wirte - Conc search start, dur for each conc={self.read_dur_after_write}", - ) + log.info(f"Search after wirte - Conc search start, dur for each conc={self.read_dur_after_write}") max_qps = self.run_by_dur(self.read_dur_after_write) log.info(f"Search after wirte - Conc search finished, max_qps={max_qps}") @@ -157,9 +155,7 @@ def wait_next_target(start: int, target_batch: int) -> bool: got = wait_next_target(start_batch, target_batch) if got is False: - log.warning( - f"Abnormal exit, target_batch={target_batch}, start_batch={start_batch}", - ) + log.warning(f"Abnormal exit, target_batch={target_batch}, start_batch={start_batch}") return None log.info(f"Insert {perc}% done, total batch={total_batch}") @@ -167,8 +163,8 @@ def wait_next_target(start: int, target_batch: int) -> bool: res, ssearch_dur = self.serial_search_runner.run() recall, ndcg, p99_latency = res log.info( - f"[{target_batch}/{total_batch}] Serial search - {perc}% done, recall={recall}, ", - f"ndcg={ndcg}, p99={p99_latency}, dur={ssearch_dur:.4f}", + f"[{target_batch}/{total_batch}] Serial search - {perc}% done, recall={recall}, " + f"ndcg={ndcg}, p99={p99_latency}, dur={ssearch_dur:.4f}" ) # Search duration for non-last search stage is carefully calculated. @@ -183,8 +179,8 @@ def wait_next_target(start: int, target_batch: int) -> bool: each_conc_search_dur = csearch_dur / len(self.concurrencies) if each_conc_search_dur < 30: warning_msg = ( - f"Results might be inaccurate, duration[{csearch_dur:.4f}] left for conc-search is too short, ", - f"total available dur={total_dur_between_stages}, serial_search_cost={ssearch_dur}.", + f"Results might be inaccurate, duration[{csearch_dur:.4f}] left for conc-search is too short, " + f"total available dur={total_dur_between_stages}, serial_search_cost={ssearch_dur}." ) log.warning(warning_msg) @@ -193,7 +189,7 @@ def wait_next_target(start: int, target_batch: int) -> bool: each_conc_search_dur = 60 log.info( - f"[{target_batch}/{total_batch}] Concurrent search - {perc}% start, dur={each_conc_search_dur:.4f}", + f"[{target_batch}/{total_batch}] Concurrent search - {perc}% start, dur={each_conc_search_dur:.4f}" ) max_qps = self.run_by_dur(each_conc_search_dur) result.append((perc, max_qps, recall, ndcg, p99_latency)) diff --git a/vectordb_bench/backend/runner/serial_runner.py b/vectordb_bench/backend/runner/serial_runner.py index 7eb59432..08d42e14 100644 --- a/vectordb_bench/backend/runner/serial_runner.py +++ b/vectordb_bench/backend/runner/serial_runner.py @@ -40,9 +40,7 @@ def __init__( def task(self) -> int: count = 0 with self.db.init(): - log.info( - f"({mp.current_process().name:16}) Start inserting embeddings in batch {config.NUM_PER_BATCH}", - ) + log.info(f"({mp.current_process().name:16}) Start inserting embeddings in batch {config.NUM_PER_BATCH}") start = time.perf_counter() for data_df in self.dataset: all_metadata = data_df["id"].tolist() @@ -66,13 +64,11 @@ def task(self) -> int: assert insert_count == len(all_metadata) count += insert_count if count % 100_000 == 0: - log.info( - f"({mp.current_process().name:16}) Loaded {count} embeddings into VectorDB", - ) + log.info(f"({mp.current_process().name:16}) Loaded {count} embeddings into VectorDB") log.info( - f"({mp.current_process().name:16}) Finish loading all dataset into VectorDB, ", - f"dur={time.perf_counter()-start}", + f"({mp.current_process().name:16}) Finish loading all dataset into VectorDB, " + f"dur={time.perf_counter()-start}" ) return count @@ -83,8 +79,8 @@ def endless_insert_data(self, all_embeddings: list, all_metadata: list, left_id: num_batches = math.ceil(len(all_embeddings) / NUM_PER_BATCH) log.info( - f"({mp.current_process().name:16}) Start inserting {len(all_embeddings)} ", - f"embeddings in batch {NUM_PER_BATCH}", + f"({mp.current_process().name:16}) Start inserting {len(all_embeddings)} " + f"embeddings in batch {NUM_PER_BATCH}" ) count = 0 for batch_id in range(num_batches): @@ -94,8 +90,8 @@ def endless_insert_data(self, all_embeddings: list, all_metadata: list, left_id: embeddings = all_embeddings[batch_id * NUM_PER_BATCH : (batch_id + 1) * NUM_PER_BATCH] log.debug( - f"({mp.current_process().name:16}) batch [{batch_id:3}/{num_batches}], ", - f"Start inserting {len(metadata)} embeddings", + f"({mp.current_process().name:16}) batch [{batch_id:3}/{num_batches}], " + f"Start inserting {len(metadata)} embeddings" ) while retry_count < LOAD_MAX_TRY_COUNT: insert_count, error = self.db.insert_embeddings( @@ -113,15 +109,15 @@ def endless_insert_data(self, all_embeddings: list, all_metadata: list, left_id: else: break log.debug( - f"({mp.current_process().name:16}) batch [{batch_id:3}/{num_batches}], ", - f"Finish inserting {len(metadata)} embeddings", + f"({mp.current_process().name:16}) batch [{batch_id:3}/{num_batches}], " + f"Finish inserting {len(metadata)} embeddings" ) assert already_insert_count == len(metadata) count += already_insert_count log.info( - f"({mp.current_process().name:16}) Finish inserting {len(all_embeddings)} embeddings in ", - f"batch {NUM_PER_BATCH}", + f"({mp.current_process().name:16}) Finish inserting {len(all_embeddings)} embeddings in " + f"batch {NUM_PER_BATCH}" ) return count @@ -171,13 +167,13 @@ def run_endlessness(self) -> int: max_load_count += count times += 1 log.info( - f"Loaded {times} entire dataset, current max load counts={utils.numerize(max_load_count)}, ", - f"{max_load_count}", + f"Loaded {times} entire dataset, current max load counts={utils.numerize(max_load_count)}, " + f"{max_load_count}" ) except Exception as e: log.info( - f"Capacity case load reach limit, insertion counts={utils.numerize(max_load_count)}, ", - f"{max_load_count}, err={e}", + f"Capacity case load reach limit, insertion counts={utils.numerize(max_load_count)}, " + f"{max_load_count}, err={e}" ) traceback.print_exc() return max_load_count @@ -209,9 +205,7 @@ def __init__( self.ground_truth = ground_truth def search(self, args: tuple[list, pd.DataFrame]) -> tuple[float, float, float]: - log.info( - f"{mp.current_process().name:14} start search the entire test_data to get recall and latency", - ) + log.info(f"{mp.current_process().name:14} start search the entire test_data to get recall and latency") with self.db.init(): test_data, ground_truth = args ideal_dcg = get_ideal_dcg(self.k) @@ -242,8 +236,8 @@ def search(self, args: tuple[list, pd.DataFrame]) -> tuple[float, float, float]: if len(latencies) % 100 == 0: log.debug( - f"({mp.current_process().name:14}) search_count={len(latencies):3}, ", - f"latest_latency={latencies[-1]}, latest recall={recalls[-1]}", + f"({mp.current_process().name:14}) search_count={len(latencies):3}, " + f"latest_latency={latencies[-1]}, latest recall={recalls[-1]}" ) avg_latency = round(np.mean(latencies), 4) @@ -258,7 +252,7 @@ def search(self, args: tuple[list, pd.DataFrame]) -> tuple[float, float, float]: f"avg_recall={avg_recall}, " f"avg_ndcg={avg_ndcg}," f"avg_latency={avg_latency}, " - f"p99={p99}", + f"p99={p99}" ) return (avg_recall, avg_ndcg, p99) diff --git a/vectordb_bench/backend/task_runner.py b/vectordb_bench/backend/task_runner.py index e24d74f0..e8be9f07 100644 --- a/vectordb_bench/backend/task_runner.py +++ b/vectordb_bench/backend/task_runner.py @@ -98,9 +98,7 @@ def _pre_run(self, drop_old: bool = True): self.init_db(drop_old) self.ca.dataset.prepare(self.dataset_source, filters=self.ca.filter_rate) except ModuleNotFoundError as e: - log.warning( - f"pre run case error: please install client for db: {self.config.db}, error={e}", - ) + log.warning(f"pre run case error: please install client for db: {self.config.db}, error={e}") raise e from None def run(self, drop_old: bool = True) -> Metric: @@ -136,9 +134,7 @@ def _run_capacity_case(self) -> Metric: log.warning(f"Failed to run capacity case, reason = {e}") raise e from None else: - log.info( - f"Capacity case loading dataset reaches VectorDB's limit: max capacity = {count}", - ) + log.info(f"Capacity case loading dataset reaches VectorDB's limit: max capacity = {count}") return Metric(max_load_count=count) def _run_perf_case(self, drop_old: bool = True) -> Metric: @@ -147,22 +143,6 @@ def _run_perf_case(self, drop_old: bool = True) -> Metric: Returns: Metric: load_duration, recall, serial_latency_p99, and, qps """ - """ - if drop_old: - _, load_dur = self._load_train_data() - build_dur = self._optimize() - m.load_duration = round(load_dur+build_dur, 4) - log.info( - f"Finish loading the entire dataset into VectorDB," - f" insert_duration={load_dur}, optimize_duration={build_dur}" - f" load_duration(insert + optimize) = {m.load_duration}" - ) - - self._init_search_runner() - - m.qps, m.conc_num_list, m.conc_qps_list, m.conc_latency_p99_list = self._conc_search() - m.recall, m.serial_latency_p99 = self._serial_search() - """ log.info("Start performance case") try: @@ -175,7 +155,7 @@ def _run_perf_case(self, drop_old: bool = True) -> Metric: log.info( f"Finish loading the entire dataset into VectorDB," f" insert_duration={load_dur}, optimize_duration={build_dur}" - f" load_duration(insert + optimize) = {m.load_duration}", + f" load_duration(insert + optimize) = {m.load_duration}" ) else: log.info("Data loading skipped") diff --git a/vectordb_bench/interface.py b/vectordb_bench/interface.py index ebe12d2e..2e573fdc 100644 --- a/vectordb_bench/interface.py +++ b/vectordb_bench/interface.py @@ -65,9 +65,7 @@ def run(self, tasks: list[TaskConfig], task_label: str | None = None) -> bool: log.warning("Empty tasks submitted") return False - log.debug( - f"tasks: {tasks}, task_label: {task_label}, dataset source: {self.dataset_source}", - ) + log.debug(f"tasks: {tasks}, task_label: {task_label}, dataset source: {self.dataset_source}") # Generate run_id run_id = uuid.uuid4().hex @@ -169,14 +167,13 @@ def _async_task_v2(self, running_task: TaskRunner, send_conn: Connection) -> Non drop_old = TaskStage.DROP_OLD in runner.config.stages if (latest_runner and runner == latest_runner) or not self.drop_old: drop_old = False + num_cases = running_task.num_cases() try: - log.info( - f"[{idx+1}/{running_task.num_cases()}] start case: {runner.display()}, drop_old={drop_old}", - ) + log.info(f"[{idx+1}/{num_cases}] start case: {runner.display()}, drop_old={drop_old}") case_res.metrics = runner.run(drop_old) log.info( - f"[{idx+1}/{running_task.num_cases()}] finish case: {runner.display()}, " - f"result={case_res.metrics}, label={case_res.label}", + f"[{idx+1}/{num_cases}] finish case: {runner.display()}, " + f"result={case_res.metrics}, label={case_res.label}" ) # cache the latest succeeded runner @@ -189,16 +186,12 @@ def _async_task_v2(self, running_task: TaskRunner, send_conn: Connection) -> Non if not drop_old: case_res.metrics.load_duration = cached_load_duration if cached_load_duration else 0.0 except (LoadTimeoutError, PerformanceTimeoutError) as e: - log.warning( - f"[{idx+1}/{running_task.num_cases()}] case {runner.display()} failed to run, reason={e}", - ) + log.warning(f"[{idx+1}/{num_cases}] case {runner.display()} failed to run, reason={e}") case_res.label = ResultLabel.OUTOFRANGE continue except Exception as e: - log.warning( - f"[{idx+1}/{running_task.num_cases()}] case {runner.display()} failed to run, reason={e}", - ) + log.warning(f"[{idx+1}/{num_cases}] case {runner.display()} failed to run, reason={e}") traceback.print_exc() case_res.label = ResultLabel.FAILED continue @@ -217,9 +210,7 @@ def _async_task_v2(self, running_task: TaskRunner, send_conn: Connection) -> Non send_conn.send((SIGNAL.SUCCESS, None)) send_conn.close() - log.info( - f"Success to finish task: label={running_task.task_label}, run_id={running_task.run_id}", - ) + log.info(f"Success to finish task: label={running_task.task_label}, run_id={running_task.run_id}") except Exception as e: err_msg = ( @@ -250,7 +241,7 @@ def _clear_running_task(self): def _run_async(self, conn: Connection) -> bool: log.info( f"task submitted: id={self.running_task.run_id}, {self.running_task.task_label}, " - f"case number: {len(self.running_task.case_runners)}", + f"case number: {len(self.running_task.case_runners)}" ) global global_result_future executor = concurrent.futures.ProcessPoolExecutor(