From 6385016902a61a7dc816c588ac7a7623349b2fb0 Mon Sep 17 00:00:00 2001 From: "min.tian" Date: Wed, 22 Jan 2025 16:46:56 +0800 Subject: [PATCH] Label Filter Cases (only compatible with milvus) Signed-off-by: min.tian --- vectordb_bench/__init__.py | 28 +----- vectordb_bench/backend/cases.py | 91 +++++++++++++---- vectordb_bench/backend/clients/api.py | 21 +++- .../backend/clients/milvus/config.py | 3 +- .../backend/clients/milvus/milvus.py | 97 ++++++++++++++----- vectordb_bench/backend/dataset.py | 28 +++++- vectordb_bench/backend/filter.py | 75 ++++++++++++++ vectordb_bench/backend/runner/mp_runner.py | 7 +- .../backend/runner/read_write_runner.py | 4 +- .../backend/runner/serial_runner.py | 20 ++-- vectordb_bench/backend/task_runner.py | 7 +- .../components/check_results/filters.py | 54 +++++++---- .../components/label_filter/charts.py | 60 ++++++++++++ .../frontend/config/dbCaseConfigs.py | 39 +++++++- vectordb_bench/frontend/pages/label_filter.py | 52 ++++++++++ vectordb_bench/models.py | 1 + 16 files changed, 483 insertions(+), 104 deletions(-) create mode 100644 vectordb_bench/backend/filter.py create mode 100644 vectordb_bench/frontend/components/label_filter/charts.py create mode 100644 vectordb_bench/frontend/pages/label_filter.py diff --git a/vectordb_bench/__init__.py b/vectordb_bench/__init__.py index 42b8ed82..e20e8db3 100644 --- a/vectordb_bench/__init__.py +++ b/vectordb_bench/__init__.py @@ -27,33 +27,7 @@ class config: DROP_OLD = env.bool("DROP_OLD", True) USE_SHUFFLED_DATA = env.bool("USE_SHUFFLED_DATA", True) - NUM_CONCURRENCY = env.list( - "NUM_CONCURRENCY", - [ - 1, - 5, - 10, - 15, - 20, - 25, - 30, - 35, - 40, - 45, - 50, - 55, - 60, - 65, - 70, - 75, - 80, - 85, - 90, - 95, - 100, - ], - subcast=int, - ) + NUM_CONCURRENCY = env.list("NUM_CONCURRENCY", [1, 5, 10, 20, 30, 40, 60, 80], subcast=int) CONCURRENCY_DURATION = 30 diff --git a/vectordb_bench/backend/cases.py b/vectordb_bench/backend/cases.py index f0e437db..51eb6541 100644 --- a/vectordb_bench/backend/cases.py +++ b/vectordb_bench/backend/cases.py @@ -4,10 +4,9 @@ from vectordb_bench import config from vectordb_bench.backend.clients.api import MetricType +from vectordb_bench.backend.filter import Filter, FilterType, IntFilter, LabelFilter, non_filter from vectordb_bench.base import BaseModel -from vectordb_bench.frontend.components.custom.getCustomConfig import ( - CustomDatasetConfig, -) +from vectordb_bench.frontend.components.custom.getCustomConfig import CustomDatasetConfig from .dataset import CustomDataset, Dataset, DatasetManager, DatasetWithSizeType @@ -50,6 +49,8 @@ class CaseType(Enum): StreamingPerformanceCase = 200 + LabelFilterPerformanceCase = 300 + def case_cls(self, custom_configs: dict | None = None) -> type["Case"]: if custom_configs is None: return type2case.get(self)() @@ -97,15 +98,21 @@ class Case(BaseModel): filter_rate: float | None = None @property - def filters(self) -> dict | None: - if self.filter_rate is not None: - target_id = round(self.filter_rate * self.dataset.data.size) - return { - "metadata": f">={target_id}", - "id": target_id, - } + def filters(self) -> Filter: + return non_filter + + @property + def with_scalar_labels(self) -> bool: + return self.filters.type == FilterType.Label + + def check_scalar_labels(self) -> None: + if self.with_scalar_labels and not self.dataset.data.with_scalar_labels: + msg = f"Case init failed: no scalar_labels data in current dataset ({self.dataset.data.full_name})" + raise ValueError(msg) - return None + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.check_scalar_labels() class CapacityCase(Case): @@ -151,6 +158,14 @@ class Performance768D10M(PerformanceCase): optimize_timeout: float | int | None = config.OPTIMIZE_TIMEOUT_768D_10M +class IntFilterPerformanceCase(PerformanceCase): + @property + def filters(self) -> Filter: + int_field = self.dataset.data.train_id_field + int_value = int(self.dataset.data.size * self.filter_rate) + return IntFilter(filter_rate=self.filter_rate, int_field=int_field, int_value=int_value) + + class Performance768D1M(PerformanceCase): case_id: CaseType = CaseType.Performance768D1M dataset: DatasetManager = Dataset.COHERE.manager(1_000_000) @@ -162,7 +177,7 @@ class Performance768D1M(PerformanceCase): optimize_timeout: float | int | None = config.OPTIMIZE_TIMEOUT_768D_1M -class Performance768D10M1P(PerformanceCase): +class Performance768D10M1P(IntFilterPerformanceCase): case_id: CaseType = CaseType.Performance768D10M1P filter_rate: float | int | None = 0.01 dataset: DatasetManager = Dataset.COHERE.manager(10_000_000) @@ -174,7 +189,7 @@ class Performance768D10M1P(PerformanceCase): optimize_timeout: float | int | None = config.OPTIMIZE_TIMEOUT_768D_10M -class Performance768D1M1P(PerformanceCase): +class Performance768D1M1P(IntFilterPerformanceCase): case_id: CaseType = CaseType.Performance768D1M1P filter_rate: float | int | None = 0.01 dataset: DatasetManager = Dataset.COHERE.manager(1_000_000) @@ -186,7 +201,7 @@ class Performance768D1M1P(PerformanceCase): optimize_timeout: float | int | None = config.OPTIMIZE_TIMEOUT_768D_1M -class Performance768D10M99P(PerformanceCase): +class Performance768D10M99P(IntFilterPerformanceCase): case_id: CaseType = CaseType.Performance768D10M99P filter_rate: float | int | None = 0.99 dataset: DatasetManager = Dataset.COHERE.manager(10_000_000) @@ -198,7 +213,7 @@ class Performance768D10M99P(PerformanceCase): optimize_timeout: float | int | None = config.OPTIMIZE_TIMEOUT_768D_10M -class Performance768D1M99P(PerformanceCase): +class Performance768D1M99P(IntFilterPerformanceCase): case_id: CaseType = CaseType.Performance768D1M99P filter_rate: float | int | None = 0.99 dataset: DatasetManager = Dataset.COHERE.manager(1_000_000) @@ -246,7 +261,7 @@ class Performance1536D5M(PerformanceCase): optimize_timeout: float | int | None = config.OPTIMIZE_TIMEOUT_1536D_5M -class Performance1536D500K1P(PerformanceCase): +class Performance1536D500K1P(IntFilterPerformanceCase): case_id: CaseType = CaseType.Performance1536D500K1P filter_rate: float | int | None = 0.01 dataset: DatasetManager = Dataset.OPENAI.manager(500_000) @@ -258,7 +273,7 @@ class Performance1536D500K1P(PerformanceCase): optimize_timeout: float | int | None = config.OPTIMIZE_TIMEOUT_1536D_500K -class Performance1536D5M1P(PerformanceCase): +class Performance1536D5M1P(IntFilterPerformanceCase): case_id: CaseType = CaseType.Performance1536D5M1P filter_rate: float | int | None = 0.01 dataset: DatasetManager = Dataset.OPENAI.manager(5_000_000) @@ -270,7 +285,7 @@ class Performance1536D5M1P(PerformanceCase): optimize_timeout: float | int | None = config.OPTIMIZE_TIMEOUT_1536D_5M -class Performance1536D500K99P(PerformanceCase): +class Performance1536D500K99P(IntFilterPerformanceCase): case_id: CaseType = CaseType.Performance1536D500K99P filter_rate: float | int | None = 0.99 dataset: DatasetManager = Dataset.OPENAI.manager(500_000) @@ -282,7 +297,7 @@ class Performance1536D500K99P(PerformanceCase): optimize_timeout: float | int | None = config.OPTIMIZE_TIMEOUT_1536D_500K -class Performance1536D5M99P(PerformanceCase): +class Performance1536D5M99P(IntFilterPerformanceCase): case_id: CaseType = CaseType.Performance1536D5M99P filter_rate: float | int | None = 0.99 dataset: DatasetManager = Dataset.OPENAI.manager(5_000_000) @@ -408,6 +423,43 @@ def __init__( ) +class LabelFilterPerformanceCase(PerformanceCase): + case_id: CaseType = CaseType.LabelFilterPerformanceCase + dataset_with_size_type: DatasetWithSizeType + label_percentage: float + + def __init__( + self, + dataset_with_size_type: DatasetWithSizeType | str, + label_percentage: float, + **kwargs, + ): + if not isinstance(dataset_with_size_type, DatasetWithSizeType): + dataset_with_size_type = DatasetWithSizeType(dataset_with_size_type) + name = f"Label-Filter-{label_percentage*100:.1f}% - {dataset_with_size_type.value}" + description = f"Label-Filter-{label_percentage*100:.1f}% Performance Test ({dataset_with_size_type.value})" + dataset = dataset_with_size_type.get_manager() + load_timeout = dataset_with_size_type.get_load_timeout() + optimize_timeout = dataset_with_size_type.get_optimize_timeout() + filters = LabelFilter(label_percentage=label_percentage) + filter_rate = filters.filter_rate + super().__init__( + name=name, + description=description, + dataset=dataset, + load_timeout=load_timeout, + optimize_timeout=optimize_timeout, + filter_rate=filter_rate, + dataset_with_size_type=dataset_with_size_type, + label_percentage=label_percentage, + **kwargs, + ) + + @property + def filters(self) -> Filter: + return LabelFilter(label_percentage=self.label_percentage) + + type2case = { CaseType.CapacityDim960: CapacityDim960, CaseType.CapacityDim128: CapacityDim128, @@ -427,4 +479,5 @@ def __init__( CaseType.Performance1536D50K: Performance1536D50K, CaseType.PerformanceCustomDataset: PerformanceCustomDataset, CaseType.StreamingPerformanceCase: StreamingPerformanceCase, + CaseType.LabelFilterPerformanceCase: LabelFilterPerformanceCase, } diff --git a/vectordb_bench/backend/clients/api.py b/vectordb_bench/backend/clients/api.py index a86849e9..d409b8d7 100644 --- a/vectordb_bench/backend/clients/api.py +++ b/vectordb_bench/backend/clients/api.py @@ -4,6 +4,8 @@ from pydantic import BaseModel, SecretStr, validator +from vectordb_bench.backend.filter import Filter, FilterType + class MetricType(str, Enum): L2 = "L2" @@ -110,6 +112,21 @@ class VectorDB(ABC): >>> milvus.search_embedding() """ + "The filtering types supported by the VectorDB Client, default only non-filter" + supported_filter_types: list[FilterType] = [FilterType.NonFilter] + + @classmethod + def filter_supported(cls, filters: Filter) -> bool: + """Ensure that the filters are supported before testing filtering cases.""" + return filters.type in cls.supported_filter_types + + def prepare_filter(self, filters: Filter): + """The vector database is allowed to pre-prepare different filter conditions + to reduce redundancy during the testing process. + + (All search tests in a case use consistent filtering conditions.)""" + return + @abstractmethod def __init__( self, @@ -160,8 +177,9 @@ def insert_embeddings( self, embeddings: list[list[float]], metadata: list[int], + labels_data: list[str] | None = None, **kwargs, - ) -> (int, Exception): + ) -> tuple[int, Exception]: """Insert the embeddings to the vector database. The default number of embeddings for each insert_embeddings is 5000. @@ -180,7 +198,6 @@ def search_embedding( self, query: list[float], k: int = 100, - filters: dict | None = None, ) -> list[int]: """Get k most similar embeddings to query vector. diff --git a/vectordb_bench/backend/clients/milvus/config.py b/vectordb_bench/backend/clients/milvus/config.py index f49084d9..ceab5826 100644 --- a/vectordb_bench/backend/clients/milvus/config.py +++ b/vectordb_bench/backend/clients/milvus/config.py @@ -4,7 +4,7 @@ class MilvusConfig(DBConfig): - uri: SecretStr = "http://10.102.7.230:19530" + uri: SecretStr = "http://localhost:19530" user: str | None = None password: SecretStr | None = None @@ -33,6 +33,7 @@ class MilvusIndexConfig(BaseModel): index: IndexType metric_type: MetricType | None = None + use_partition_key: bool = True # for label-filter @property def is_gpu_index(self) -> bool: diff --git a/vectordb_bench/backend/clients/milvus/milvus.py b/vectordb_bench/backend/clients/milvus/milvus.py index 4015eb1f..339d6d69 100644 --- a/vectordb_bench/backend/clients/milvus/milvus.py +++ b/vectordb_bench/backend/clients/milvus/milvus.py @@ -7,6 +7,8 @@ from pymilvus import Collection, CollectionSchema, DataType, FieldSchema, MilvusException, utility +from vectordb_bench.backend.filter import Filter, FilterType + from ..api import VectorDB from .config import MilvusIndexConfig @@ -16,6 +18,12 @@ class Milvus(VectorDB): + supported_filter_types: list[FilterType] = [ + FilterType.NonFilter, + FilterType.Int, + FilterType.Label, + ] + def __init__( self, dim: int, @@ -24,6 +32,7 @@ def __init__( collection_name: str = "VectorDBBenchCollection", drop_old: bool = False, name: str = "Milvus", + with_scalar_labels: bool = False, **kwargs, ): """Initialize wrapper around the milvus vector database.""" @@ -32,11 +41,15 @@ def __init__( self.case_config = db_case_config self.collection_name = collection_name self.batch_size = int(MILVUS_LOAD_REQS_SIZE / (dim * 4)) + self.with_scalar_labels = with_scalar_labels self._primary_field = "pk" - self._scalar_field = "id" + self._scalar_id_field = "id" + self._scalar_label_field = "label" self._vector_field = "vector" - self._index_name = "vector_idx" + self._vector_index_name = "vector_idx" + self._scalar_id_index_name = "id_sort_idx" + self._scalar_labels_index_name = "labels_idx" from pymilvus import connections @@ -48,9 +61,20 @@ def __init__( if not utility.has_collection(self.collection_name): fields = [ FieldSchema(self._primary_field, DataType.INT64, is_primary=True), - FieldSchema(self._scalar_field, DataType.INT64), + FieldSchema(self._scalar_id_field, DataType.INT64), FieldSchema(self._vector_field, DataType.FLOAT_VECTOR, dim=dim), ] + if self.with_scalar_labels: + is_partition_key = db_case_config.use_partition_key + log.info(f"with_scalar_labels, add a new varchar field, as partition_key: {is_partition_key}") + fields.append( + FieldSchema( + self._scalar_label_field, + DataType.VARCHAR, + max_length=256, + is_partition_key=is_partition_key, + ) + ) log.info(f"{self.name} create collection: {self.collection_name}") @@ -61,15 +85,37 @@ def __init__( consistency_level="Session", ) - col.create_index( - self._vector_field, - self.case_config.index_param(), - index_name=self._index_name, - ) + self.create_index() col.load() connections.disconnect("default") + def create_index(self): + col = Collection(self.collection_name) + # vector index + col.create_index( + self._vector_field, + self.case_config.index_param(), + index_name=self._vector_index_name, + ) + # scalar index for range-expr (int-filter) + col.create_index( + self._scalar_id_field, + index_params={ + "index_type": "STL_SORT", + }, + index_name=self._scalar_id_index_name, + ) + # scalar index for varchar (label-filter) + if self.with_scalar_labels: + col.create_index( + self._scalar_label_field, + index_params={ + "index_type": "BITMAP", + }, + index_name=self._scalar_labels_index_name, + ) + @contextmanager def init(self) -> None: """ @@ -102,17 +148,13 @@ def _post_insert(self): try: self.col.flush() # wait for index done and load refresh - self.col.create_index( - self._vector_field, - self.case_config.index_param(), - index_name=self._index_name, - ) + self.create_index() - utility.wait_for_index_building_complete(self.collection_name) + utility.wait_for_index_building_complete(self.collection_name, index_name=self._vector_index_name) def wait_index(): while True: - progress = utility.index_building_progress(self.collection_name) + progress = utility.index_building_progress(self.collection_name, index_name=self._vector_index_name) if progress.get("pending_index_rows", -1) == 0: break time.sleep(5) @@ -154,8 +196,9 @@ def insert_embeddings( self, embeddings: Iterable[list[float]], metadata: list[int], + labels_data: list[str] | None = None, **kwargs, - ) -> (int, Exception): + ) -> tuple[int, Exception]: """Insert embeddings into Milvus. should call self.init() first""" # use the first insert_embeddings to init collection assert self.col is not None @@ -169,32 +212,42 @@ def insert_embeddings( metadata[batch_start_offset:batch_end_offset], embeddings[batch_start_offset:batch_end_offset], ] + if self.with_scalar_labels: + insert_data.append(labels_data[batch_start_offset:batch_end_offset]) res = self.col.insert(insert_data) insert_count += len(res.primary_keys) except MilvusException as e: log.info(f"Failed to insert data: {e}") - return (insert_count, e) - return (insert_count, None) + return insert_count, e + return insert_count, None + + def prepare_filter(self, filters: Filter): + if filters.type == FilterType.NonFilter: + self.expr = "" + elif filters.type == FilterType.Int: + self.expr = f"{self._scalar_id_field} >= {filters.int_value}" + elif filters.type == FilterType.Label: + self.expr = f"{self._scalar_label_field} == '{filters.label_value}'" + else: + msg = f"Not support Filter for Milvus - {filters}" + raise ValueError(msg) def search_embedding( self, query: list[float], k: int = 100, - filters: dict | None = None, timeout: int | None = None, ) -> list[int]: """Perform a search on a query embedding and return results.""" assert self.col is not None - expr = f"{self._scalar_field} {filters.get('metadata')}" if filters else "" - # Perform the search. res = self.col.search( data=[query], anns_field=self._vector_field, param=self.case_config.search_param(), limit=k, - expr=expr, + expr=self.expr, ) # Organize results. diff --git a/vectordb_bench/backend/dataset.py b/vectordb_bench/backend/dataset.py index 7c030709..a2022940 100644 --- a/vectordb_bench/backend/dataset.py +++ b/vectordb_bench/backend/dataset.py @@ -20,6 +20,7 @@ from . import utils from .clients import MetricType from .data_source import DatasetReader, DatasetSource +from .filter import Filter, FilterType, non_filter log = logging.getLogger(__name__) @@ -40,7 +41,13 @@ class BaseDataset(BaseModel): _size_label: dict[int, SizeLabel] = PrivateAttr() is_custom: bool = False with_remote_resource: bool = True + # for label filter cases with_scalar_labels: bool = False + # if True, scalar_labels will be retrieved from a separate parquet file; + # otherwise, they will be obtained from train.parquet. + scalar_labels_file_separated: bool = True + scalar_labels_file: str = "scalar_labels.parquet" + scalar_label_percentages: list[float] = [] train_id_field: str = "id" train_vector_field: str = "emb" test_file: str = "test.parquet" @@ -133,6 +140,8 @@ class Cohere(BaseDataset): 1_000_000: SizeLabel(1_000_000, "MEDIUM", 1), 10_000_000: SizeLabel(10_000_000, "LARGE", 10), } + with_scalar_labels: bool = True + scalar_label_percentages: list[float] = [0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5] class Glove(BaseDataset): @@ -170,6 +179,8 @@ class OpenAI(BaseDataset): 500_000: SizeLabel(500_000, "MEDIUM", 1), 5_000_000: SizeLabel(5_000_000, "LARGE", 10), } + with_scalar_labels: bool = True + scalar_label_percentages: list[float] = [0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5] class DatasetManager(BaseModel): @@ -186,6 +197,7 @@ class DatasetManager(BaseModel): data: BaseDataset test_data: list[list[float]] | None = None gt_data: list[list[int]] | None = None + scalar_labels: pl.DataFrame | None = None train_files: list[str] = [] reader: DatasetReader | None = None @@ -219,14 +231,14 @@ def __iter__(self): def prepare( self, source: DatasetSource = DatasetSource.S3, - filters: float | str | None = None, + filters: Filter = non_filter, ) -> bool: """Download the dataset from DatasetSource url = f"{source}/{self.data.dir_name}" Args: source(DatasetSource): S3 or AliyunOSS, default as S3 - filters(Optional[int | float | str]): combined with dataset's with_gt to + filters(Filter): combined with dataset's with_gt to compose the correct ground_truth file Returns: @@ -236,17 +248,23 @@ def prepare( self.train_files = self.data.train_files gt_file, test_file = None, None if self.data.with_gt: - gt_file, test_file = utils.compose_gt_file(filters), self.data.test_file + gt_file, test_file = filters.groundtruth_file, self.data.test_file if self.data.with_remote_resource: download_files = [file for file in self.train_files] download_files.extend([gt_file, test_file]) + if self.data.with_scalar_labels and self.data.scalar_labels_file_separated: + download_files.append(self.data.scalar_labels_file) source.reader().read( dataset=self.data.dir_name.lower(), files=download_files, local_ds_root=self.data_dir, ) + # read scalar_labels_file if separated + if filters.type == FilterType.Label and self.data.with_scalar_labels and self.data.scalar_labels_file_separated: + self.scalar_labels = self._read_file(self.data.scalar_labels_file) + if gt_file is not None and test_file is not None: self.test_data = self._read_file(test_file)[self.data.test_vector_field].to_list() self.gt_data = self._read_file(gt_file)[self.data.gt_neighbors_field].to_list() @@ -255,13 +273,13 @@ def prepare( return True - def _read_file(self, file_name: str) -> pd.DataFrame: + def _read_file(self, file_name: str) -> pl.DataFrame: """read one file from disk into memory""" log.info(f"Read the entire file into memory: {file_name}") p = pathlib.Path(self.data_dir, file_name) if not p.exists(): log.warning(f"No such file: {p}") - return pd.DataFrame() + return pl.DataFrame() return pl.read_parquet(p) diff --git a/vectordb_bench/backend/filter.py b/vectordb_bench/backend/filter.py new file mode 100644 index 00000000..e9494d3b --- /dev/null +++ b/vectordb_bench/backend/filter.py @@ -0,0 +1,75 @@ +from enum import StrEnum + +from ..base import BaseModel + + +class FilterType(StrEnum): + Int = "Int" # test ">=" + Label = "Label" # test "==" + NonFilter = "NonFilter" + + +class Filter(BaseModel): + type: FilterType + filter_rate: float = 0.0 + + @property + def groundtruth_file(self) -> str: + raise NotImplementedError + + +class NonFilter(Filter): + type: FilterType = FilterType.NonFilter + filter_rate: float = 0.0 + + @property + def groundtruth_file(self) -> str: + return "neighbors.parquet" + + +non_filter = NonFilter() + + +class IntFilter(Filter): + """ + compatible with older int-filter cases + filter expr: int_field >= int_value (dataset_size * filter_rate) + """ + + type: FilterType = FilterType.Int + int_field: str = "id" + int_value: int + + @property + def groundtruth_file(self) -> str: + if self.filter_rate == 0.01: + return "neighbors_head_1p.parquet" + if self.filter_rate == 0.99: + return "neighbors_tail_1p.parquet" + msg = f"Not Support Int Filter - {self.filter_rate}" + raise RuntimeError(msg) + + +class LabelFilter(Filter): + """ + filter expr: label_field == label_value, like `color == "red"` + """ + + type: FilterType = FilterType.Label + label_field: str = "labels" + label_percentage: float + + @property + def label_value(self) -> str: + p = self.label_percentage * 100 + if p >= 1: + return f"label_{int(p)}p" # such as 5p, 20p, 1p, ... + return f"label_{p:.1f}p" # such as 0.1p, 0.5p, ... + + def __init__(self, label_percentage: float, **kwargs): + filter_rate = 1.0 - label_percentage + super().__init__(filter_rate=filter_rate, label_percentage=label_percentage, **kwargs) + + @property + def groundtruth_file(self) -> str: + return f"neighbors_{self.label_field}_{self.label_value}.parquet" diff --git a/vectordb_bench/backend/runner/mp_runner.py b/vectordb_bench/backend/runner/mp_runner.py index eb6c49fb..749872cf 100644 --- a/vectordb_bench/backend/runner/mp_runner.py +++ b/vectordb_bench/backend/runner/mp_runner.py @@ -8,6 +8,8 @@ import numpy as np +from vectordb_bench.backend.filter import Filter, non_filter + from ... import config from ..clients import api @@ -29,7 +31,7 @@ def __init__( db: api.VectorDB, test_data: list[list[float]], k: int = 100, - filters: dict | None = None, + filters: Filter = non_filter, concurrencies: Iterable[int] = config.NUM_CONCURRENCY, duration: int = 30, ): @@ -54,6 +56,7 @@ def search( cond.wait() with self.db.init(): + self.db.prepare_filter(self.filters) num, idx = len(test_data), random.randint(0, len(test_data) - 1) start_time = time.perf_counter() @@ -65,7 +68,6 @@ def search( self.db.search_embedding( test_data[idx], self.k, - self.filters, ) count += 1 latencies.append(time.perf_counter() - s) @@ -246,6 +248,7 @@ def search_by_dur(self, dur: int, test_data: list[list[float]], q: mp.Queue, con cond.wait() with self.db.init(): + self.db.prepare_filter(self.filters) num, idx = len(test_data), random.randint(0, len(test_data) - 1) start_time = time.perf_counter() diff --git a/vectordb_bench/backend/runner/read_write_runner.py b/vectordb_bench/backend/runner/read_write_runner.py index d37ab574..a236e31c 100644 --- a/vectordb_bench/backend/runner/read_write_runner.py +++ b/vectordb_bench/backend/runner/read_write_runner.py @@ -10,6 +10,7 @@ from vectordb_bench.backend.clients import api from vectordb_bench.backend.dataset import DatasetManager +from vectordb_bench.backend.filter import Filter, non_filter from vectordb_bench.backend.utils import time_it from vectordb_bench.metric import Metric @@ -28,7 +29,7 @@ def __init__( insert_rate: int = 1000, normalize: bool = False, k: int = 100, - filters: dict | None = None, + filters: Filter = non_filter, concurrencies: Iterable[int] = (1, 15, 50), search_stages: Iterable[float] = ( 0.5, @@ -82,6 +83,7 @@ def __init__( test_data=test_emb, ground_truth=dataset.gt_data, k=k, + filters=filters, ) @time_it diff --git a/vectordb_bench/backend/runner/serial_runner.py b/vectordb_bench/backend/runner/serial_runner.py index db86b4b3..e7dfc7c8 100644 --- a/vectordb_bench/backend/runner/serial_runner.py +++ b/vectordb_bench/backend/runner/serial_runner.py @@ -9,6 +9,7 @@ import psutil from vectordb_bench.backend.dataset import DatasetManager +from vectordb_bench.backend.filter import Filter, FilterType, non_filter from ... import config from ...metric import calc_ndcg, calc_recall, get_ideal_dcg @@ -28,12 +29,14 @@ def __init__( db: api.VectorDB, dataset: DatasetManager, normalize: bool, + filters: Filter = non_filter, timeout: float | None = None, ): self.timeout = timeout if isinstance(timeout, int | float) else None self.dataset = dataset self.db = db self.normalize = normalize + self.filters = filters def task(self) -> int: count = 0 @@ -52,9 +55,17 @@ def task(self) -> int: del emb_np log.debug(f"batch dataset size: {len(all_embeddings)}, {len(all_metadata)}") + labels_data = None + if self.filters.type == FilterType.Label: + if self.dataset.data.scalar_labels_file_separated: + labels_data = self.dataset.scalar_labels[self.filters.label_field][all_metadata].to_list() + else: + labels_data = data_df[self.filters.label_field].tolist() + insert_count, error = self.db.insert_embeddings( embeddings=all_embeddings, metadata=all_metadata, + labels_data=labels_data, ) if error is not None: raise error @@ -188,7 +199,7 @@ def __init__( test_data: list[list[float]], ground_truth: list[list[int]], k: int = 100, - filters: dict | None = None, + filters: Filter = non_filter, ): self.db = db self.k = k @@ -202,11 +213,7 @@ def __init__( def _get_db_search_res(self, emb: list[float], retry_idx: int = 0) -> list[int]: try: - results = self.db.search_embedding( - emb, - self.k, - self.filters, - ) + results = self.db.search_embedding(emb, self.k) except Exception as e: log.warning(f"Serial search failed, retry_idx={retry_idx}, Exception: {e}") if retry_idx < config.Max_Search_Retry: @@ -220,6 +227,7 @@ def _get_db_search_res(self, emb: list[float], retry_idx: int = 0) -> list[int]: def search(self, args: tuple[list, list[list[int]]]) -> tuple[float, float, float]: log.info(f"{mp.current_process().name:14} start search the entire test_data to get recall and latency") with self.db.init(): + self.db.prepare_filter(self.filters) test_data, ground_truth = args ideal_dcg = get_ideal_dcg(self.k) diff --git a/vectordb_bench/backend/task_runner.py b/vectordb_bench/backend/task_runner.py index 90ccf762..2003a571 100644 --- a/vectordb_bench/backend/task_runner.py +++ b/vectordb_bench/backend/task_runner.py @@ -56,6 +56,8 @@ def __eq__(self, obj: any): and self.config.db == obj.config.db and self.config.db_case_config == obj.config.db_case_config and self.ca.dataset == obj.ca.dataset + # Label-filter cases store one more scalar data than other cases + and self.ca.with_scalar_labels == self.ca.with_scalar_labels ) return False @@ -92,12 +94,13 @@ def init_db(self, drop_old: bool = True) -> None: db_config=self.config.db_config.to_dict(), db_case_config=self.config.db_case_config, drop_old=drop_old, + with_scalar_labels=self.ca.with_scalar_labels, ) def _pre_run(self, drop_old: bool = True): try: self.init_db(drop_old) - self.ca.dataset.prepare(self.dataset_source, filters=self.ca.filter_rate) + self.ca.dataset.prepare(self.dataset_source, filters=self.ca.filters) except ModuleNotFoundError as e: log.warning(f"pre run case error: please install client for db: {self.config.db}, error={e}") raise e from None @@ -130,6 +133,7 @@ def _run_capacity_case(self) -> Metric: self.db, self.ca.dataset, self.normalize, + self.ca.filters, self.ca.load_timeout, ) count = runner.run_endlessness() @@ -206,6 +210,7 @@ def _load_train_data(self): self.db, self.ca.dataset, self.normalize, + self.ca.filters, self.ca.load_timeout, ) runner.run() diff --git a/vectordb_bench/frontend/components/check_results/filters.py b/vectordb_bench/frontend/components/check_results/filters.py index efe7c84a..3b341b20 100644 --- a/vectordb_bench/frontend/components/check_results/filters.py +++ b/vectordb_bench/frontend/components/check_results/filters.py @@ -1,4 +1,6 @@ from vectordb_bench.backend.cases import Case +from vectordb_bench.backend.dataset import DatasetWithSizeType +from vectordb_bench.backend.filter import FilterType from vectordb_bench.frontend.components.check_results.data import getChartData from vectordb_bench.frontend.components.check_results.expanderStyle import ( initSidebarExanderStyle, @@ -11,7 +13,7 @@ from vectordb_bench.models import CaseResult, TestResult -def getshownData(st, results: list[TestResult], **kwargs): +def getshownData(st, results: list[TestResult], filter_type: FilterType = FilterType.NonFilter, **kwargs): # hide the nav st.markdown( "", @@ -21,7 +23,7 @@ def getshownData(st, results: list[TestResult], **kwargs): st.header("Filters") shownResults = getshownResults(st, results, **kwargs) - showDBNames, showCaseNames = getShowDbsAndCases(st, shownResults) + showDBNames, showCaseNames = getShowDbsAndCases(st, shownResults, filter_type) shownData, failedTasks = getChartData(shownResults, showDBNames, showCaseNames) @@ -55,11 +57,12 @@ def getshownResults( return selectedResult -def getShowDbsAndCases(st, result: list[CaseResult]) -> tuple[list[str], list[str]]: +def getShowDbsAndCases(st, result: list[CaseResult], filter_type: FilterType) -> tuple[list[str], list[str]]: initSidebarExanderStyle(st) - allDbNames = list(set({res.task_config.db_name for res in result})) + case_results = [res for res in result if res.task_config.case_config.case.filters.type == filter_type] + allDbNames = list(set({res.task_config.db_name for res in case_results})) allDbNames.sort() - allCases: list[Case] = [res.task_config.case_config.case for res in result] + allCases: list[Case] = [res.task_config.case_config.case for res in case_results] # DB Filter dbFilterContainer = st.container() @@ -71,19 +74,36 @@ def getShowDbsAndCases(st, result: list[CaseResult]) -> tuple[list[str], list[st ) showCaseNames = [] - allCaseNameSet = set({case.name for case in allCases}) - allCaseNames = [case_name for case_name in CASE_NAME_ORDER if case_name in allCaseNameSet] + [ - case_name for case_name in allCaseNameSet if case_name not in CASE_NAME_ORDER - ] + if filter_type == FilterType.NonFilter: + allCaseNameSet = set({case.name for case in allCases}) + allCaseNames = [case_name for case_name in CASE_NAME_ORDER if case_name in allCaseNameSet] + [ + case_name for case_name in allCaseNameSet if case_name not in CASE_NAME_ORDER + ] + + # Case Filter + caseFilterContainer = st.container() + showCaseNames = filterView( + caseFilterContainer, + "Case Filter", + [caseName for caseName in allCaseNames], + col=1, + ) - # Case Filter - caseFilterContainer = st.container() - showCaseNames = filterView( - caseFilterContainer, - "Case Filter", - [caseName for caseName in allCaseNames], - col=1, - ) + if filter_type == FilterType.Label: + container = st.container() + datasetWithSizeTypes = [dataset_with_size_type for dataset_with_size_type in DatasetWithSizeType] + showDatasetWithSizeTypes = filterView( + container, + "Case Filter", + datasetWithSizeTypes, + col=1, + optionLables=[v.value for v in datasetWithSizeTypes], + ) + datasets = [dataset_with_size_type.get_manager() for dataset_with_size_type in showDatasetWithSizeTypes] + showCaseNames = list(set([case.name for case in allCases if case.dataset in datasets])) + + if filter_type == FilterType.Int: + raise NotImplementedError return showDBNames, showCaseNames diff --git a/vectordb_bench/frontend/components/label_filter/charts.py b/vectordb_bench/frontend/components/label_filter/charts.py new file mode 100644 index 00000000..88168103 --- /dev/null +++ b/vectordb_bench/frontend/components/label_filter/charts.py @@ -0,0 +1,60 @@ +import plotly.express as px +from vectordb_bench.metric import metric_unit_map + + +def drawCharts(st, allData, **kwargs): + dataset_names = list(set([data["dataset_name"] for data in allData])) + dataset_names.sort() + for dataset_name in dataset_names: + container = st.container() + container.subheader(dataset_name) + data = [d for d in allData if d["dataset_name"] == dataset_name] + drawChartByMetric(container, data, **kwargs) + + +def drawChartByMetric(st, data, metrics=("qps", "recall"), **kwargs): + columns = st.columns(len(metrics)) + for i, metric in enumerate(metrics): + container = columns[i] + container.markdown(f"#### {metric}") + drawChart(container, data, metric) + + +def getRange(metric, data, padding_multipliers): + minV = min([d.get(metric, 0) for d in data]) + maxV = max([d.get(metric, 0) for d in data]) + padding = maxV - minV + rangeV = [ + minV - padding * padding_multipliers[0], + maxV + padding * padding_multipliers[1], + ] + return rangeV + + +def drawChart(st, data: list[object], metric): + unit = metric_unit_map.get(metric, "") + x = "filter_rate" + xrange = getRange(x, data, [0.05, 0.1]) + + y = metric + yrange = getRange(y, data, [0.2, 0.1]) + + data.sort(key=lambda a: a[x]) + + fig = px.line( + data, + x=x, + y=y, + color="db_name", + line_group="db_name", + text=metric, + markers=True, + ) + fig.update_xaxes(range=xrange) + fig.update_yaxes(range=yrange) + fig.update_traces(textposition="bottom right", texttemplate="%{y:,.4~r}" + unit) + fig.update_layout( + margin=dict(l=0, r=0, t=40, b=0, pad=8), + legend=dict(orientation="h", yanchor="bottom", y=1, xanchor="right", x=1, title=""), + ) + st.plotly_chart(fig, use_container_width=True) diff --git a/vectordb_bench/frontend/config/dbCaseConfigs.py b/vectordb_bench/frontend/config/dbCaseConfigs.py index 8fdb899d..9fb34b5a 100644 --- a/vectordb_bench/frontend/config/dbCaseConfigs.py +++ b/vectordb_bench/frontend/config/dbCaseConfigs.py @@ -180,6 +180,17 @@ def generate_custom_streaming_case() -> CaseConfig: ] +def generate_label_filter_cases(dataset_with_size_type: DatasetWithSizeType) -> list[CaseConfig]: + label_percentages = dataset_with_size_type.get_manager().data.scalar_label_percentages + return [ + CaseConfig( + case_id=CaseType.LabelFilterPerformanceCase, + custom_case=dict(dataset_with_size_type=dataset_with_size_type, label_percentage=label_percentage), + ) + for label_percentage in label_percentages + ] + + UI_CASE_CLUSTERS: list[UICaseItemCluster] = [ UICaseItemCluster( label="Search Performance Test", @@ -194,7 +205,7 @@ def generate_custom_streaming_case() -> CaseConfig: ], ), UICaseItemCluster( - label="Filter Search Performance Test", + label="Int-Filter Search Performance Test", uiCaseItems=[ UICaseItem(cases=generate_normal_cases(CaseType.Performance768D10M1P)), UICaseItem(cases=generate_normal_cases(CaseType.Performance768D10M99P)), @@ -207,6 +218,23 @@ def generate_custom_streaming_case() -> CaseConfig: UICaseItem(cases=generate_normal_cases(CaseType.Performance1536D500K99P)), ], ), + UICaseItemCluster( + label="Label-Filter Search Performance Test", + uiCaseItems=[ + UICaseItem( + label=f"Label-Filter Search Performance Test - {dataset_with_size_type.value}", + description=( + f'[Batch Cases] These cases evaluate search performance under filtering constraints like "color==red." ' + "Vdbbench provides an additional column of randomly distributed labels with fixed proportions, " + f"such as {dataset_with_size_type.get_manager().data.scalar_label_percentages}. " + f"Essentially, vdbbench will test each filter label in {dataset_with_size_type.value} to " + "assess the vector database's search performance across different filtering conditions. " + ), + cases=generate_label_filter_cases(dataset_with_size_type), + ) + for dataset_with_size_type in DatasetWithSizeType + ], + ), UICaseItemCluster( label="Capacity Test", uiCaseItems=[ @@ -1132,6 +1160,13 @@ def generate_custom_streaming_case() -> CaseConfig: }, ) +CaseConfigParamInput_Milvus_use_partition_key = CaseConfigInput( + label=CaseConfigParamType.use_partition_key, + inputType=InputType.Option, + inputHelp="whether to use partition_key for label-filter cases. only works in label-filter cases", + inputConfig={"options": [True, False]}, +) + MilvusLoadConfig = [ CaseConfigParamInput_IndexType, @@ -1144,6 +1179,7 @@ def generate_custom_streaming_case() -> CaseConfig: CaseConfigParamInput_graph_degree, CaseConfigParamInput_build_algo, CaseConfigParamInput_cache_dataset_on_device, + CaseConfigParamInput_Milvus_use_partition_key, ] MilvusPerformanceConfig = [ CaseConfigParamInput_IndexType, @@ -1165,6 +1201,7 @@ def generate_custom_streaming_case() -> CaseConfig: CaseConfigParamInput_build_algo, CaseConfigParamInput_cache_dataset_on_device, CaseConfigParamInput_refine_ratio, + CaseConfigParamInput_Milvus_use_partition_key, ] WeaviateLoadConfig = [ diff --git a/vectordb_bench/frontend/pages/label_filter.py b/vectordb_bench/frontend/pages/label_filter.py new file mode 100644 index 00000000..dc3d9908 --- /dev/null +++ b/vectordb_bench/frontend/pages/label_filter.py @@ -0,0 +1,52 @@ +import streamlit as st +from vectordb_bench.backend.filter import FilterType +from vectordb_bench.frontend.components.check_results.footer import footer +from vectordb_bench.frontend.components.check_results.headerIcon import drawHeaderIcon +from vectordb_bench.frontend.components.check_results.nav import ( + NavToQuriesPerDollar, + NavToRunTest, +) +from vectordb_bench.frontend.components.label_filter.charts import drawCharts +from vectordb_bench.frontend.components.check_results.filters import getshownData +from vectordb_bench.frontend.config.styles import FAVICON +from vectordb_bench.interface import benchmark_runner + + +def main(): + # set page config + st.set_page_config( + page_title="Label Filter", + page_icon=FAVICON, + layout="wide", + # initial_sidebar_state="collapsed", + ) + + # header + drawHeaderIcon(st) + + allResults = benchmark_runner.get_results() + + st.title("Vector Database Benchmark (Label Filter)") + + # results selector and filter + resultSelectorContainer = st.sidebar.container() + shownData, failedTasks, showCaseNames = getshownData( + resultSelectorContainer, allResults, filter_type=FilterType.Label + ) + + resultSelectorContainer.divider() + + # nav + navContainer = st.sidebar.container() + NavToRunTest(navContainer) + NavToQuriesPerDollar(navContainer) + + # charts + drawCharts(st, shownData) + + # footer + footer(st.container()) + + +if __name__ == "__main__": + main() diff --git a/vectordb_bench/models.py b/vectordb_bench/models.py index e233aa80..591e103e 100644 --- a/vectordb_bench/models.py +++ b/vectordb_bench/models.py @@ -87,6 +87,7 @@ class CaseConfigParamType(Enum): preReorderingNumNeigbors = "pre_reordering_num_neighbors" numSearchThreads = "num_search_threads" maxNumPrefetchDatasets = "max_num_prefetch_datasets" + use_partition_key = "use_partition_key" dataset_with_size_type = "dataset_with_size_type" insert_rate = "insert_rate"