From 142820167b1edc4948f76049659377140de60ae9 Mon Sep 17 00:00:00 2001 From: "zhenshan.cao" Date: Fri, 10 Nov 2023 18:10:10 +0800 Subject: [PATCH] Add filter parameter to delete (#1779) Commencing from Milvus 2.3.3, an enhanced functionality has been introduced to facilitate data deletion based on expressive criteria. In this context, the inclusion of the "filter" parameter serves as an entry point for accessing this feature. All rows that conform to the specified filter expression will be eliminated. The deletion condition should either be a list of primary keys to be deleted or a filter expression. If both are specified or if none are specified, an exception will be thrown. Signed-off-by: zhenshan.cao --- examples/hello_milvus_delete.py | 74 +++++++++++++++++++++++++ pymilvus/exceptions.py | 3 + pymilvus/milvus_client/milvus_client.py | 69 ++++++++++------------- 3 files changed, 107 insertions(+), 39 deletions(-) create mode 100644 examples/hello_milvus_delete.py diff --git a/examples/hello_milvus_delete.py b/examples/hello_milvus_delete.py new file mode 100644 index 000000000..5dd296590 --- /dev/null +++ b/examples/hello_milvus_delete.py @@ -0,0 +1,74 @@ +import time +import numpy as np +from pymilvus import ( + MilvusClient, + exceptions +) + +fmt = "\n=== {:30} ===\n" +dim = 8 +collection_name = "hello_milvus" +milvus_client = MilvusClient("http://localhost:19530") +milvus_client.drop_collection(collection_name) +milvus_client.create_collection(collection_name, dim, consistency_level="Strong", metric_type="L2") + +print("collections:", milvus_client.list_collections()) +print(f"{collection_name} :", milvus_client.describe_collection(collection_name)) +rng = np.random.default_rng(seed=19530) + +rows = [ + {"id": 1, "vector": rng.random((1, dim))[0], "a": 1}, + {"id": 2, "vector": rng.random((1, dim))[0], "b": 2}, + {"id": 3, "vector": rng.random((1, dim))[0], "c": 3}, + {"id": 4, "vector": rng.random((1, dim))[0], "d": 4}, + {"id": 5, "vector": rng.random((1, dim))[0], "e": 5}, + {"id": 6, "vector": rng.random((1, dim))[0], "f": 6}, +] + +print(fmt.format("Start inserting entities")) +pks = milvus_client.insert(collection_name, rows, progress_bar=True) +pks2 = milvus_client.insert(collection_name, {"id": 7, "vector": rng.random((1, dim))[0], "g": 1}) +pks.extend(pks2) + + +def fetch_data_by_pk(pk): + print(f"get primary key {pk} from {collection_name}") + pk_data = milvus_client.get(collection_name, pk) + + if pk_data: + print(f"data of primary key {pk} is", pk_data[0]) + else: + print(f"data of primary key {pk} is empty") + +fetch_data_by_pk(pks[2]) + +print(f"start to delete primary key {pks[2]} in collection {collection_name}") +milvus_client.delete(collection_name, pks = pks[2]) + +fetch_data_by_pk(pks[2]) + + +fetch_data_by_pk(pks[4]) +filter = "e == 5 or f == 6" +print(f"start to delete by expr {filter} in collection {collection_name}") +milvus_client.delete(collection_name, filter=filter) + +fetch_data_by_pk(pks[4]) + +print(f"start to delete by expr '{filter}' or by primary 4 in collection {collection_name}, expect get exception") +try: + milvus_client.delete(collection_name, pks = 4, filter=filter) +except Exception as e: + assert isinstance(e, exceptions.ParamError) + print("catch exception", e) + +print(f"start to delete without specify any expr '{filter}' or any primary key in collection {collection_name}, expect get exception") +try: + milvus_client.delete(collection_name) +except Exception as e: + print("catch exception", e) + +result = milvus_client.query(collection_name, "", output_fields = ["count(*)"]) +print(f"final entities in {collection_name} is {result[0]['count(*)']}") + +milvus_client.drop_collection(collection_name) diff --git a/pymilvus/exceptions.py b/pymilvus/exceptions.py index e71df737c..5b1db4269 100644 --- a/pymilvus/exceptions.py +++ b/pymilvus/exceptions.py @@ -215,3 +215,6 @@ class ExceptionsMessage: "Attempt to insert an unexpected field to collection without enabling dynamic field" ) UpsertAutoIDTrue = "Upsert don't support autoid == true" + AmbiguousDeleteFilterParam = ( + "Ambiguous filter parameter, only one deletion condition can be specified." + ) diff --git a/pymilvus/milvus_client/milvus_client.py b/pymilvus/milvus_client/milvus_client.py index 6e0d02680..e6225672d 100644 --- a/pymilvus/milvus_client/milvus_client.py +++ b/pymilvus/milvus_client/milvus_client.py @@ -8,6 +8,7 @@ from pymilvus.exceptions import ( DataTypeNotMatchException, MilvusException, + ParamError, PrimaryKeyException, ) from pymilvus.orm import utility @@ -58,9 +59,7 @@ def __init__( self._using = self._create_connection( uri, user, password, db_name, token, timeout=timeout, **kwargs ) - self.is_self_hosted = bool( - utility.get_server_type(using=self._using) == "milvus", - ) + self.is_self_hosted = bool(utility.get_server_type(using=self._using) == "milvus") def create_collection( self, @@ -104,10 +103,7 @@ def create_collection( except Exception as ex: logger.error("Failed to create collection: %s", collection_name) raise ex from ex - index_params = { - "metric_type": metric_type, - "params": {}, - } + index_params = {"metric_type": metric_type, "params": {}} self._create_index(collection_name, vector_field_name, index_params, timeout=timeout) self._load(collection_name, timeout=timeout) @@ -121,21 +117,10 @@ def _create_index( """Create a index on the collection""" conn = self._get_connection() try: - conn.create_index( - collection_name, - vec_field_name, - index_params, - timeout=timeout, - ) - logger.debug( - "Successfully created an index on collection: %s", - collection_name, - ) + conn.create_index(collection_name, vec_field_name, index_params, timeout=timeout) + logger.debug("Successfully created an index on collection: %s", collection_name) except Exception as ex: - logger.error( - "Failed to create an index on collection: %s", - collection_name, - ) + logger.error("Failed to create an index on collection: %s", collection_name) raise ex from ex def insert( @@ -195,9 +180,7 @@ def insert( pks.extend(res.primary_keys) except Exception as ex: logger.error( - "Failed to insert batch starting at entity: %s/%s", - str(i), - str(len(data)), + "Failed to insert batch starting at entity: %s/%s", str(i), str(len(data)) ) raise ex from ex @@ -370,8 +353,9 @@ def get( def delete( self, collection_name: str, - pks: Union[list, str, int], + pks: Optional[Union[list, str, int]] = None, timeout: Optional[float] = None, + filter: Optional[str] = "", **kwargs, ): """Delete entries in the collection by their pk. @@ -390,7 +374,8 @@ def delete( Args: pks (list, str, int): The pk's to delete. Depending on pk_field type it can be int - or str or alist of either. + or str or alist of either. Default to None. + filter(str, optional): A filter to use for the deletion. Defaults to empty. timeout (int, optional): Timeout to use, overides the client level assigned at init. Defaults to None. """ @@ -398,17 +383,26 @@ def delete( if isinstance(pks, (int, str)): pks = [pks] - if len(pks) == 0: - return [] - + expr = "" conn = self._get_connection() - try: - schema_dict = conn.describe_collection(collection_name, timeout=timeout, **kwargs) - except Exception as ex: - logger.error("Failed to describe collection: %s", collection_name) - raise ex from ex + if pks: + try: + schema_dict = conn.describe_collection(collection_name, timeout=timeout, **kwargs) + except Exception as ex: + logger.error("Failed to describe collection: %s", collection_name) + raise ex from ex + + expr = self._pack_pks_expr(schema_dict, pks) + + if filter: + if expr: + raise ParamError(message=ExceptionsMessage.AmbiguousDeleteFilterParam) + + if not isinstance(filter, str): + raise DataTypeNotMatchException(message=ExceptionsMessage.ExprType % type(filter)) + + expr = filter - expr = self._pack_pks_expr(schema_dict, pks) ret_pks = [] try: res = conn.delete(collection_name, expr, timeout=timeout, **kwargs) @@ -600,8 +594,5 @@ def _load(self, collection_name: str, timeout: Optional[float] = None): try: conn.load_collection(collection_name, timeout=timeout) except MilvusException as ex: - logger.error( - "Failed to load collection: %s", - collection_name, - ) + logger.error("Failed to load collection: %s", collection_name) raise ex from ex