diff --git a/.gitignore b/.gitignore
index 2f34f10ee..004524444 100644
--- a/.gitignore
+++ b/.gitignore
@@ -7,3 +7,5 @@ __pycache__
__MACOSX
.DS_Store
build/
+venv/
+.idea/
diff --git a/tests/test_models.py b/tests/test_models.py
index 555b0dbab..c61774a22 100644
--- a/tests/test_models.py
+++ b/tests/test_models.py
@@ -65,6 +65,6 @@ def test_test_result_merge(self):
def test_test_result_display(self):
result_dir = config.RESULTS_LOCAL_DIR
- for json_file in result_dir.glob("*.json"):
+ for json_file in result_dir.glob("result*.json"):
res = TestResult.read_file(json_file)
res.display()
diff --git a/vectordb_bench/__init__.py b/vectordb_bench/__init__.py
index 7a5df57bd..a9421a048 100644
--- a/vectordb_bench/__init__.py
+++ b/vectordb_bench/__init__.py
@@ -18,12 +18,23 @@ class config:
USE_SHUFFLED_DATA = env.bool("USE_SHUFFLED_DATA", True)
RESULTS_LOCAL_DIR = pathlib.Path(__file__).parent.joinpath("results")
- CASE_TIMEOUT_IN_SECOND = 24 * 60 * 60
+
+ CAPACITY_TIMEOUT_IN_SECONDS = 24 * 3600 # 24h
+ LOAD_TIMEOUT_1M = 1.5 * 3600 # 1.5h
+ LOAD_TIMEOUT_10M = 15 * 3600 # 15h
+ LOAD_TIMEOUT_100M = 150 * 3600 # 6.25d
+
+ OPTIMIZE_TIMEOUT_1M = 15 * 60 # 15min
+ OPTIMIZE_TIMEOUT_10M = 2.5 * 3600 # 2.5h
+ OPTIMIZE_TIMEOUT_100M = 25 * 3600 # 1.04d
def display(self) -> str:
- tmp = [i for i in inspect.getmembers(self)
- if not inspect.ismethod(i[1]) and not i[0].startswith('_') \
+ tmp = [
+ i for i in inspect.getmembers(self)
+ if not inspect.ismethod(i[1])
+ and not i[0].startswith('_')
+ and "TIMEOUT" not in i[0]
]
return tmp
diff --git a/vectordb_bench/backend/cases.py b/vectordb_bench/backend/cases.py
index 639ab2984..56b38e3f4 100644
--- a/vectordb_bench/backend/cases.py
+++ b/vectordb_bench/backend/cases.py
@@ -2,8 +2,10 @@
import logging
from enum import Enum, auto
+from vectordb_bench import config
+from vectordb_bench.base import BaseModel
+
from .dataset import Dataset, DatasetManager
-from ..base import BaseModel
log = logging.getLogger(__name__)
@@ -75,6 +77,9 @@ class Case(BaseModel):
description: str
dataset: DatasetManager
+ load_timeout: float | int
+ optimize_timeout: float | int | None
+
filter_rate: float | None
@property
@@ -92,6 +97,8 @@ def filters(self) -> dict | None:
class CapacityCase(Case, BaseModel):
label: CaseLabel = CaseLabel.Load
filter_rate: float | None = None
+ load_timeout: float | int = config.CAPACITY_TIMEOUT_IN_SECONDS
+ optimize_timeout: float | int | None = None
class PerformanceCase(Case, BaseModel):
@@ -121,6 +128,8 @@ class Performance10M(PerformanceCase):
name: str = "Search Performance Test (10M Dataset, 768 Dim)"
description: str = """This case tests the search performance of a vector database with a large dataset (Cohere 10M vectors, 768 dimensions) at varying parallel levels.
Results will show index building time, recall, and maximum QPS."""
+ load_timeout: float | int = config.LOAD_TIMEOUT_10M
+ optimize_timeout: float | int | None = config.OPTIMIZE_TIMEOUT_10M
class Performance1M(PerformanceCase):
@@ -129,6 +138,8 @@ class Performance1M(PerformanceCase):
name: str = "Search Performance Test (1M Dataset, 768 Dim)"
description: str = """This case tests the search performance of a vector database with a medium dataset (Cohere 1M vectors, 768 dimensions) at varying parallel levels.
Results will show index building time, recall, and maximum QPS."""
+ load_timeout: float | int = config.LOAD_TIMEOUT_1M
+ optimize_timeout: float | int | None = config.OPTIMIZE_TIMEOUT_1M
class Performance10M1P(PerformanceCase):
@@ -138,6 +149,8 @@ class Performance10M1P(PerformanceCase):
name: str = "Filtering Search Performance Test (10M Dataset, 768 Dim, Filter 1%)"
description: str = """This case tests the search performance of a vector database with a large dataset (Cohere 10M vectors, 768 dimensions) under a low filtering rate (1% vectors), at varying parallel levels.
Results will show index building time, recall, and maximum QPS."""
+ load_timeout: float | int = config.LOAD_TIMEOUT_10M
+ optimize_timeout: float | int | None = config.OPTIMIZE_TIMEOUT_10M
class Performance1M1P(PerformanceCase):
@@ -147,6 +160,8 @@ class Performance1M1P(PerformanceCase):
name: str = "Filtering Search Performance Test (1M Dataset, 768 Dim, Filter 1%)"
description: str = """This case tests the search performance of a vector database with a medium dataset (Cohere 1M vectors, 768 dimensions) under a low filtering rate (1% vectors), at varying parallel levels.
Results will show index building time, recall, and maximum QPS."""
+ load_timeout: float | int = config.LOAD_TIMEOUT_1M
+ optimize_timeout: float | int | None = config.OPTIMIZE_TIMEOUT_1M
class Performance10M99P(PerformanceCase):
@@ -156,6 +171,8 @@ class Performance10M99P(PerformanceCase):
name: str = "Filtering Search Performance Test (10M Dataset, 768 Dim, Filter 99%)"
description: str = """This case tests the search performance of a vector database with a large dataset (Cohere 10M vectors, 768 dimensions) under a high filtering rate (99% vectors), at varying parallel levels.
Results will show index building time, recall, and maximum QPS."""
+ load_timeout: float | int = config.LOAD_TIMEOUT_10M
+ optimize_timeout: float | int | None = config.OPTIMIZE_TIMEOUT_10M
class Performance1M99P(PerformanceCase):
@@ -165,6 +182,8 @@ class Performance1M99P(PerformanceCase):
name: str = "Filtering Search Performance Test (1M Dataset, 768 Dim, Filter 99%)"
description: str = """This case tests the search performance of a vector database with a medium dataset (Cohere 1M vectors, 768 dimensions) under a high filtering rate (99% vectors), at varying parallel levels.
Results will show index building time, recall, and maximum QPS."""
+ load_timeout: float | int = config.LOAD_TIMEOUT_1M
+ optimize_timeout: float | int | None = config.OPTIMIZE_TIMEOUT_1M
@@ -175,6 +194,8 @@ class Performance100M(PerformanceCase):
name: str = "Search Performance Test (100M Dataset, 768 Dim)"
description: str = """This case tests the search performance of a vector database with a large 100M dataset (LAION 100M vectors, 768 dimensions), at varying parallel levels.
Results will show index building time, recall, and maximum QPS."""
+ load_timeout: float | int = config.LOAD_TIMEOUT_100M
+ optimize_timeout: float | int | None = config.OPTIMIZE_TIMEOUT_100M
type2case = {
diff --git a/vectordb_bench/backend/clients/api.py b/vectordb_bench/backend/clients/api.py
index 49ec18df1..74bafbece 100644
--- a/vectordb_bench/backend/clients/api.py
+++ b/vectordb_bench/backend/clients/api.py
@@ -73,7 +73,7 @@ class VectorDB(ABC):
In each process, the benchmark cases ensure VectorDB.init() calls before any other methods operations
- insert_embeddings, search_embedding, and, ready_to_search will be timed for each call.
+ insert_embeddings, search_embedding, and, optimize will be timed for each call.
Examples:
>>> milvus = Milvus()
@@ -166,13 +166,14 @@ def search_embedding(
# TODO: remove
@abstractmethod
- def ready_to_search(self):
- """ready_to_search will be called between insertion and search in performance cases.
+ def optimize(self):
+ """optimize will be called between insertion and search in performance cases.
Should be blocked until the vectorDB is ready to be tested on
heavy performance cases.
- Time(insert the dataset) + Time(ready_to_search) will be recorded as "load_duration" metric
+ Time(insert the dataset) + Time(optimize) will be recorded as "load_duration" metric
+ Optimize's execution time is limited, the limited time is based on cases.
"""
raise NotImplementedError
diff --git a/vectordb_bench/backend/clients/elastic_cloud/elastic_cloud.py b/vectordb_bench/backend/clients/elastic_cloud/elastic_cloud.py
index 1f88233e8..8cd928399 100644
--- a/vectordb_bench/backend/clients/elastic_cloud/elastic_cloud.py
+++ b/vectordb_bench/backend/clients/elastic_cloud/elastic_cloud.py
@@ -143,8 +143,8 @@ def search_embedding(
log.warning(f"Failed to search: {self.indice} error: {str(e)}")
raise e from None
- def ready_to_search(self):
- """ready_to_search will be called between insertion and search in performance cases."""
+ def optimize(self):
+ """optimize will be called between insertion and search in performance cases."""
pass
def ready_to_load(self):
diff --git a/vectordb_bench/backend/clients/milvus/milvus.py b/vectordb_bench/backend/clients/milvus/milvus.py
index fcb5b324e..d6afa38dc 100644
--- a/vectordb_bench/backend/clients/milvus/milvus.py
+++ b/vectordb_bench/backend/clients/milvus/milvus.py
@@ -53,7 +53,7 @@ def __init__(
log.info(f"{self.name} create collection: {self.collection_name}")
# Create the collection
- coll = Collection(
+ Collection(
name=self.collection_name,
schema=CollectionSchema(fields),
consistency_level="Session",
@@ -107,6 +107,14 @@ def _pre_load(self, coll: Collection):
def _optimize(self):
log.info(f"{self.name} optimizing before search")
+ try:
+ self.col.load()
+ except Exception as e:
+ log.warning(f"{self.name} optimize error: {e}")
+ raise e from None
+
+ def _post_insert(self):
+ log.info(f"{self.name} post insert before optimize")
try:
self.col.flush()
self.col.compact()
@@ -119,10 +127,6 @@ def _optimize(self):
index_name=self._index_name,
)
utility.wait_for_index_building_complete(self.collection_name)
- self.col.load()
- # self.col.load(_refresh=True)
- # utility.wait_for_loading_complete(self.collection_name)
- # import time; time.sleep(10)
except Exception as e:
log.warning(f"{self.name} optimize error: {e}")
raise e from None
@@ -132,7 +136,7 @@ def ready_to_load(self):
self._pre_load(self.col)
pass
- def ready_to_search(self):
+ def optimize(self):
assert self.col, "Please call self.init() before"
self._optimize()
@@ -157,6 +161,8 @@ def insert_embeddings(
]
res = self.col.insert(insert_data, **kwargs)
insert_count += len(res.primary_keys)
+ if kwargs.get("last_batch"):
+ self._post_insert()
except MilvusException as e:
log.warning("Failed to insert data")
return (insert_count, e)
diff --git a/vectordb_bench/backend/clients/pinecone/pinecone.py b/vectordb_bench/backend/clients/pinecone/pinecone.py
index 9cafd0466..1f2b4edab 100644
--- a/vectordb_bench/backend/clients/pinecone/pinecone.py
+++ b/vectordb_bench/backend/clients/pinecone/pinecone.py
@@ -69,7 +69,7 @@ def init(self) -> None:
def ready_to_load(self):
pass
- def ready_to_search(self):
+ def optimize(self):
pass
def insert_embeddings(
diff --git a/vectordb_bench/backend/clients/qdrant_cloud/qdrant_cloud.py b/vectordb_bench/backend/clients/qdrant_cloud/qdrant_cloud.py
index 6dcbc8f03..330bc396f 100644
--- a/vectordb_bench/backend/clients/qdrant_cloud/qdrant_cloud.py
+++ b/vectordb_bench/backend/clients/qdrant_cloud/qdrant_cloud.py
@@ -74,7 +74,7 @@ def ready_to_load(self):
pass
- def ready_to_search(self):
+ def optimize(self):
assert self.qdrant_client, "Please call self.init() before"
# wait for vectors to be fully indexed
SECONDS_WAITING_FOR_INDEXING_API_CALL = 5
diff --git a/vectordb_bench/backend/clients/weaviate_cloud/weaviate_cloud.py b/vectordb_bench/backend/clients/weaviate_cloud/weaviate_cloud.py
index 5852dfeb6..38f87c7e8 100644
--- a/vectordb_bench/backend/clients/weaviate_cloud/weaviate_cloud.py
+++ b/vectordb_bench/backend/clients/weaviate_cloud/weaviate_cloud.py
@@ -70,7 +70,7 @@ def ready_to_load(self):
"""Should call insert first, do nothing"""
pass
- def ready_to_search(self):
+ def optimize(self):
assert self.client.schema.exists(self.collection_name)
self.client.schema.update_config(self.collection_name, {"vectorIndexConfig": self.case_config.search_param() } )
diff --git a/vectordb_bench/backend/runner/serial_runner.py b/vectordb_bench/backend/runner/serial_runner.py
index 03fb180f0..ba0d33823 100644
--- a/vectordb_bench/backend/runner/serial_runner.py
+++ b/vectordb_bench/backend/runner/serial_runner.py
@@ -4,77 +4,90 @@
import concurrent
import multiprocessing as mp
import math
+import psutil
+
import numpy as np
import pandas as pd
from ..clients import api
from ...metric import calc_recall
-from ...models import LoadTimeoutError
+from ...models import LoadTimeoutError, PerformanceTimeoutError
from .. import utils
from ... import config
+from vectordb_bench.backend.dataset import DatasetManager
NUM_PER_BATCH = config.NUM_PER_BATCH
-LOAD_TIMEOUT = 24 * 60 * 60
LOAD_MAX_TRY_COUNT = 10
WAITTING_TIME = 60
log = logging.getLogger(__name__)
-
class SerialInsertRunner:
- def __init__(self, db: api.VectorDB, train_emb: list[list[float]], train_id: list[int]):
- log.debug(f"Dataset shape: {len(train_emb)}")
+ def __init__(self, db: api.VectorDB, dataset: DatasetManager, normalize: bool, timeout: float | None = None):
+ self.timeout = timeout if isinstance(timeout, (int, float)) else None
+ self.dataset = dataset
self.db = db
- self.shared_emb = train_emb
- self.train_id = train_id
+ self.normalize = normalize
- self.seq_batches = math.ceil(len(train_emb)/NUM_PER_BATCH)
+ def task(self):
+ count = 0
+ for data_df in self.dataset:
+ all_metadata = data_df['id'].tolist()
- def insert_data(self, left_id: int = 0) -> int:
- with self.db.init():
- all_embeddings = self.shared_emb
+ emb_np = np.stack(data_df['emb'])
+ if self.normalize:
+ log.debug("normalize the 100k train data")
+ all_embeddings = emb_np / np.linalg.norm(emb_np, axis=1)[:, np.newaxis].tolist()
+ else:
+ all_embeddings = emb_np.tolist()
+ del(emb_np)
+ log.debug(f"batch dataset size: {len(all_embeddings)}, {len(all_metadata)}")
+
+ last = self.dataset.data.size - count == len(all_metadata)
+ count += self._insert_data(self, all_embeddings, all_metadata, last)
- # unique id for endlessness insertion
- all_metadata = [i+left_id for i in self.train_id]
- num_conc_batches = math.ceil(len(all_embeddings)/NUM_PER_BATCH)
+ def _insert_data(self, db: api.VectorDB, all_embeddings: list[list[float]], all_metadata: list[int], last: bool) -> int:
+ NUM_BATCHES = math.ceil(len(all_embeddings)/NUM_PER_BATCH)
+
+ with self.db.init():
log.info(f"({mp.current_process().name:16}) Start inserting {len(all_embeddings)} embeddings in batch {NUM_PER_BATCH}")
count = 0
- for batch_id in range(self.seq_batches):
+ for batch_id in range(NUM_BATCHES):
metadata = all_metadata[batch_id*NUM_PER_BATCH: (batch_id+1)*NUM_PER_BATCH]
embeddings = all_embeddings[batch_id*NUM_PER_BATCH: (batch_id+1)*NUM_PER_BATCH]
- log.debug(f"({mp.current_process().name:16}) batch [{batch_id:3}/{num_conc_batches}], Start inserting {len(metadata)} embeddings")
+ last_batch = last and (batch_id == NUM_BATCHES - 1)
+ log.debug(f"({mp.current_process().name:16}) batch [{batch_id:3}/{NUM_BATCHES}], Start inserting {len(metadata)} embeddings")
insert_count, error = self.db.insert_embeddings(
embeddings=embeddings,
metadata=metadata,
+ last_batch=last_batch,
)
if error is not None:
raise error
- log.debug(f"({mp.current_process().name:16}) batch [{batch_id:3}/{num_conc_batches}], Finish inserting {len(metadata)} embeddings")
+ log.debug(f"({mp.current_process().name:16}) batch [{batch_id:3}/{NUM_BATCHES}], Finish inserting {len(metadata)} embeddings")
assert insert_count == len(metadata)
count += insert_count
log.info(f"({mp.current_process().name:16}) Finish inserting {len(all_embeddings)} embeddings in batch {NUM_PER_BATCH}")
return count
- def endless_insert_data(self, left_id: int = 0) -> int:
+ def endless_insert_data(self, all_embeddings, all_metadata, left_id: int = 0) -> int:
with self.db.init():
- all_embeddings = self.shared_emb
-
# unique id for endlessness insertion
- all_metadata = [i+left_id for i in self.train_id]
+ all_metadata = [i+left_id for i in all_metadata]
- num_conc_batches = math.ceil(len(all_embeddings)/NUM_PER_BATCH)
+ NUM_BATCHES = math.ceil(len(all_embeddings)/NUM_PER_BATCH)
log.info(f"({mp.current_process().name:16}) Start inserting {len(all_embeddings)} embeddings in batch {NUM_PER_BATCH}")
count = 0
- for batch_id in range(self.seq_batches):
+ for batch_id in range(NUM_BATCHES):
retry_count = 0
already_insert_count = 0
metadata = all_metadata[batch_id*NUM_PER_BATCH : (batch_id+1)*NUM_PER_BATCH]
embeddings = all_embeddings[batch_id*NUM_PER_BATCH : (batch_id+1)*NUM_PER_BATCH]
- log.debug(f"({mp.current_process().name:16}) batch [{batch_id:3}/{num_conc_batches}], Start inserting {len(metadata)} embeddings")
+ log.debug(f"({mp.current_process().name:16}) batch [{batch_id:3}/{NUM_BATCHES}], Start inserting {len(metadata)} embeddings")
while retry_count < LOAD_MAX_TRY_COUNT:
insert_count, error = self.db.insert_embeddings(
embeddings=embeddings[already_insert_count :],
@@ -90,7 +103,7 @@ def endless_insert_data(self, left_id: int = 0) -> int:
raise error
else:
break
- log.debug(f"({mp.current_process().name:16}) batch [{batch_id:3}/{num_conc_batches}], Finish inserting {len(metadata)} embeddings")
+ log.debug(f"({mp.current_process().name:16}) batch [{batch_id:3}/{NUM_BATCHES}], Finish inserting {len(metadata)} embeddings")
assert already_insert_count == len(metadata)
count += already_insert_count
@@ -101,30 +114,46 @@ def endless_insert_data(self, left_id: int = 0) -> int:
def _insert_all_batches(self) -> int:
"""Performance case only"""
with concurrent.futures.ProcessPoolExecutor(mp_context=mp.get_context('spawn'), max_workers=1) as executor:
- future = executor.submit(self.insert_data)
- count = future.result()
- return count
+ future = executor.submit(self.task)
+ try:
+ count = future.result(timeout=self.timeout)
+ except TimeoutError as e:
+ msg = f"VectorDB load dataset timeout in {self.timeout}"
+ log.warning(msg)
+ for pid, _ in executor._processes.items():
+ psutil.Process(pid).kill()
+ raise PerformanceTimeoutError(msg) from e
+ except Exception as e:
+ log.warning(f"VectorDB optimize error: {e}")
+ raise e from None
+ else:
+ return count
def run_endlessness(self) -> int:
"""run forever util DB raises exception or crash"""
+ # datasets for load tests are quite small, can fit into memory
+ # only 1 file
+ data_df = [data_df for data_df in self.dataset][0]
+ all_embeddings, all_metadata = np.stack(data_df["emb"]).tolist(), data_df['id'].tolist()
+
start_time = time.perf_counter()
max_load_count, times = 0, 0
try:
with self.db.init():
self.db.ready_to_load()
- while time.perf_counter() - start_time < config.CASE_TIMEOUT_IN_SECOND:
- count = self.endless_insert_data(left_id=max_load_count)
+ while time.perf_counter() - start_time < self.timeout:
+ count = self.endless_insert_data(all_embeddings, all_metadata, left_id=max_load_count)
max_load_count += count
times += 1
log.info(f"Loaded {times} entire dataset, current max load counts={utils.numerize(max_load_count)}, {max_load_count}")
- raise LoadTimeoutError("capacity case load timeout and stop")
- except LoadTimeoutError as e:
- log.info("load timetout, stop the load case")
- raise e from None
except Exception as e:
log.info(f"Capacity case load reach limit, insertion counts={utils.numerize(max_load_count)}, {max_load_count}, err={e}")
traceback.print_exc()
return max_load_count
+ else:
+ msg = f"capacity case load timeout in {self.timeout}s"
+ log.info(msg)
+ raise LoadTimeoutError(msg)
def run(self) -> int:
count, dur = self._insert_all_batches()
diff --git a/vectordb_bench/backend/task_runner.py b/vectordb_bench/backend/task_runner.py
index f6ebbeed9..034770f30 100644
--- a/vectordb_bench/backend/task_runner.py
+++ b/vectordb_bench/backend/task_runner.py
@@ -1,4 +1,5 @@
import logging
+import psutil
import traceback
import concurrent
import numpy as np
@@ -7,7 +8,7 @@
from . import utils
from .cases import Case, CaseLabel
from ..base import BaseModel
-from ..models import TaskConfig
+from ..models import TaskConfig, PerformanceTimeoutError
from .clients import (
api,
@@ -92,38 +93,37 @@ def run(self, drop_old: bool = True) -> Metric:
self._pre_run(drop_old)
if self.ca.label == CaseLabel.Load:
- return self._run_load_case()
+ return self._run_capacity_case()
elif self.ca.label == CaseLabel.Performance:
return self._run_perf_case(drop_old)
else:
- log.warning(f"unknown case type: {self.ca.label}")
- raise ValueError(f"Unknown case type: {self.ca.label}")
+ msg = f"unknown case type: {self.ca.label}"
+ log.warning(msg)
+ raise ValueError(msg)
-
- def _run_load_case(self) -> Metric:
- """ run load cases
+ def _run_capacity_case(self) -> Metric:
+ """ run capacity cases
Returns:
Metric: the max load count
"""
log.info("Start capacity case")
- # datasets for load tests are quite small, can fit into memory
- # only 1 file
- data_df = [data_df for data_df in self.ca.dataset][0]
-
- all_embeddings, all_metadata = np.stack(data_df["emb"]).tolist(), data_df['id'].tolist()
- runner = SerialInsertRunner(self.db, all_embeddings, all_metadata)
try:
+ runner = SerialInsertRunner(self.db, self.ca.dataset, self.normalize, self.ca.load_timeout)
count = runner.run_endlessness()
- log.info(f"load reach limit: insertion counts={count}")
- return Metric(max_load_count=count)
except Exception as e:
- log.warning(f"run capacity case error: {e}")
+ log.warning(f"Failed to run capacity case, reason = {e}")
raise e from None
- log.info("End capacity case")
-
+ else:
+ log.info(f"Capacity case loading dataset reaches VectorDB's limit: max capacity = {count}")
+ return Metric(max_load_count=count)
def _run_perf_case(self, drop_old: bool = True) -> Metric:
+ """ run performance cases
+
+ Returns:
+ Metric: load_duration, recall, serial_latency_p99, and, qps
+ """
try:
m = Metric()
if drop_old:
@@ -134,38 +134,24 @@ def _run_perf_case(self, drop_old: bool = True) -> Metric:
self._init_search_runner()
m.recall, m.serial_latency_p99 = self._serial_search()
m.qps = self._conc_search()
-
- log.info(f"got results: {m}")
- return m
except Exception as e:
- log.warning(f"performance case run error: {e}")
+ log.warning(f"Failed to run performance case, reason = {e}")
traceback.print_exc()
- raise e
+ raise e from None
+ else:
+ log.info(f"Performance case got result: {m}")
+ return m
@utils.time_it
def _load_train_data(self):
"""Insert train data and get the insert_duration"""
- for data_df in self.ca.dataset:
- try:
- all_metadata = data_df['id'].tolist()
-
- emb_np = np.stack(data_df['emb'])
- if self.normalize:
- log.debug("normalize the 100k train data")
- all_embeddings = emb_np / np.linalg.norm(emb_np, axis=1)[:, np.newaxis].tolist()
- else:
- all_embeddings = emb_np.tolist()
-
- del(emb_np)
- log.debug(f"normalized size: {len(all_embeddings)}, {len(all_metadata)}")
-
- runner = SerialInsertRunner(self.db, all_embeddings, all_metadata)
- runner.run()
- except Exception as e:
- raise e from None
- finally:
- runner = None
-
+ try:
+ runner = SerialInsertRunner(self.db, self.ca.dataset, self.normalize, self.ca.load_timeout)
+ runner.run()
+ except Exception as e:
+ raise e from None
+ finally:
+ runner = None
def _serial_search(self) -> tuple[float, float]:
"""Performance serial tests, search the entire test data once,
@@ -200,15 +186,20 @@ def _conc_search(self):
def _task(self) -> None:
""""""
with self.db.init():
- self.db.ready_to_search()
+ self.db.optimize()
def _optimize(self) -> float:
with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
future = executor.submit(self._task)
try:
- return future.result()[1]
+ return future.result(timeout=self.ca.optimize_timeout)[1]
+ except TimeoutError as e:
+ log.warning(f"VectorDB optimize timeout in {self.ca.optimize_timeout}")
+ for pid, _ in executor._processes.items():
+ psutil.Process(pid).kill()
+ raise PerformanceTimeoutError("Performance case optimize timeout") from e
except Exception as e:
- log.warning(f"VectorDB ready_to_search error: {e}")
+ log.warning(f"VectorDB optimize error: {e}")
raise e from None
def _init_search_runner(self):
diff --git a/vectordb_bench/models.py b/vectordb_bench/models.py
index 821d2ff6d..bbd2f5010 100644
--- a/vectordb_bench/models.py
+++ b/vectordb_bench/models.py
@@ -167,20 +167,21 @@ def append_return(x, y):
max_qps = max(map(len, [str(f.metrics.qps) for f in filtered_results])) + 3
max_recall = max(map(len, [str(f.metrics.recall) for f in filtered_results])) + 3
- max_db_labels = 8 if max_db_labels == 0 else max_db_labels
- max_load_dur = 11 if max_load_dur == 0 else max_load_dur + 3
- max_qps = 10 if max_qps == 0 else max_load_dur + 3
- max_recall = 13 if max_recall == 0 else max_recall + 3
+ max_db_labels = 8 if max_db_labels < 8 else max_db_labels
+ max_load_dur = 11 if max_load_dur < 11 else max_load_dur
+ max_qps = 10 if max_qps < 10 else max_qps
+ max_recall = 13 if max_recall < 13 else max_recall
- LENGTH = (max_db, max_db_labels, max_case, len(self.task_label), max_load_dur, max_qps, 15, max_recall, 14)
+ LENGTH = (max_db, max_db_labels, max_case, len(self.task_label), max_load_dur, max_qps, 15, max_recall, 14, 5)
DATA_FORMAT = (
- f"%-{max_db}s | %-{max_db_labels}s %-{max_case}s %-{len(self.task_label)}s "
- f"| %-{max_load_dur}s %-{max_qps}s %-15s %-{max_recall}s %-14s"
+ f"%-{max_db}s | %-{max_db_labels}s %-{max_case}s %-{len(self.task_label)}s"
+ f" | %-{max_load_dur}s %-{max_qps}s %-15s %-{max_recall}s %-14s"
+ f" | %-5s"
)
TITLE = DATA_FORMAT % (
- "DB", "db_label", "case", "label", "load_dur", "qps", "latency(p99)", "recall", "max_load_count")
+ "DB", "db_label", "case", "label", "load_dur", "qps", "latency(p99)", "recall", "max_load_count", "label")
SPLIT = DATA_FORMAT%tuple(map(lambda x:"-"*x, LENGTH))
SUMMERY_FORMAT = ("Task summery: run_id=%s, task_label=%s") % (self.run_id[:5], self.task_label)
fmt = [SUMMERY_FORMAT, TITLE, SPLIT]
@@ -197,6 +198,7 @@ def append_return(x, y):
f.metrics.serial_latency_p99,
f.metrics.recall,
f.metrics.max_load_count,
+ f.label.value,
))
tmp_logger = logging.getLogger("no_color")