diff --git a/examples/hello_milvus_delete.py b/examples/hello_milvus_delete.py new file mode 100644 index 000000000..59ef498e1 --- /dev/null +++ b/examples/hello_milvus_delete.py @@ -0,0 +1,79 @@ +import time +import numpy as np +from pymilvus import ( + MilvusClient, +) + +fmt = "\n=== {:30} ===\n" +dim = 8 +collection_name = "hello_milvus" +milvus_client = MilvusClient("http://localhost:19530") +milvus_client.drop_collection(collection_name) +milvus_client.create_collection(collection_name, dim, consistency_level="Strong", metric_type="L2") + +print("collections:", milvus_client.list_collections()) +print(f"{collection_name} :", milvus_client.describe_collection(collection_name)) +rng = np.random.default_rng(seed=19530) + +rows = [ + {"id": 1, "vector": rng.random((1, dim))[0], "a": 1}, + {"id": 2, "vector": rng.random((1, dim))[0], "b": 2}, + {"id": 3, "vector": rng.random((1, dim))[0], "c": 3}, + {"id": 4, "vector": rng.random((1, dim))[0], "d": 4}, + {"id": 5, "vector": rng.random((1, dim))[0], "e": 5}, + {"id": 6, "vector": rng.random((1, dim))[0], "f": 6}, +] + +print(fmt.format("Start inserting entities")) +pks = milvus_client.insert(collection_name, rows, progress_bar=True) +pks2 = milvus_client.insert(collection_name, {"id": 7, "vector": rng.random((1, dim))[0], "g": 1}) +pks.extend(pks2) + + +print(f"get primary key {pks[2]} from {collection_name}") +pk_data = milvus_client.get(collection_name, pks[2]) + +if pk_data: + print(f"data of primary key {pks[2]} is", pk_data[0]) +else: + print(f"data of primary key {pks[2]} is empty") + + +print(f"start to delete first 2 of primary keys in collection {collection_name}") +milvus_client.delete(collection_name, pks = pks[0:2]) + +print(f"get primary key {pks[2]} from {collection_name}") +pk_data = milvus_client.get(collection_name, pks[2]) + +if pk_data: + print(f"data of primary key {pks[2]} is", pk_data[0]) +else: + print(f"data of primary key {pks[2]} is empty") + +filter = "e == 5 or f == 6" +print(f"start to delete by expr {filter} in collection {collection_name}") +milvus_client.delete(collection_name, filter=filter) + +print(f"get deleted primary key {pks[4]} from {collection_name}") +pk_data = milvus_client.get(collection_name, pks[4]) +if pk_data: + print(f"data of primary key {pks[4]} is", pk_data[4]) +else: + print(f"data of primary key {pks[4]} is empty") + +print(f"start to delete by expr {filter} or by primary 4 in collection {collection_name}") +milvus_client.delete(collection_name, pks = 4, filter=filter) + +print(f"get deleted primary key {pks[3]} from {collection_name}") +pk_data = milvus_client.get(collection_name, pks[3]) +if pk_data: + print(f"data of primary key {pks[3]} is", pk_data[3]) +else: + print(f"data of primary key {pks[3]} is empty") + + +result = milvus_client.query(collection_name, "", output_fields = ["count(*)"]) +print(result) +print(f"final entities in {collection_name} is {result[0]['count(*)']}") + +milvus_client.drop_collection(collection_name) diff --git a/pymilvus/milvus_client/milvus_client.py b/pymilvus/milvus_client/milvus_client.py index 6e0d02680..262c5dc4a 100644 --- a/pymilvus/milvus_client/milvus_client.py +++ b/pymilvus/milvus_client/milvus_client.py @@ -5,11 +5,7 @@ from pymilvus.client.constants import DEFAULT_CONSISTENCY_LEVEL from pymilvus.client.types import ExceptionsMessage -from pymilvus.exceptions import ( - DataTypeNotMatchException, - MilvusException, - PrimaryKeyException, -) +from pymilvus.exceptions import DataTypeNotMatchException, MilvusException, PrimaryKeyException from pymilvus.orm import utility from pymilvus.orm.collection import CollectionSchema from pymilvus.orm.connections import connections @@ -58,9 +54,7 @@ def __init__( self._using = self._create_connection( uri, user, password, db_name, token, timeout=timeout, **kwargs ) - self.is_self_hosted = bool( - utility.get_server_type(using=self._using) == "milvus", - ) + self.is_self_hosted = bool(utility.get_server_type(using=self._using) == "milvus") def create_collection( self, @@ -104,10 +98,7 @@ def create_collection( except Exception as ex: logger.error("Failed to create collection: %s", collection_name) raise ex from ex - index_params = { - "metric_type": metric_type, - "params": {}, - } + index_params = {"metric_type": metric_type, "params": {}} self._create_index(collection_name, vector_field_name, index_params, timeout=timeout) self._load(collection_name, timeout=timeout) @@ -121,21 +112,10 @@ def _create_index( """Create a index on the collection""" conn = self._get_connection() try: - conn.create_index( - collection_name, - vec_field_name, - index_params, - timeout=timeout, - ) - logger.debug( - "Successfully created an index on collection: %s", - collection_name, - ) + conn.create_index(collection_name, vec_field_name, index_params, timeout=timeout) + logger.debug("Successfully created an index on collection: %s", collection_name) except Exception as ex: - logger.error( - "Failed to create an index on collection: %s", - collection_name, - ) + logger.error("Failed to create an index on collection: %s", collection_name) raise ex from ex def insert( @@ -195,9 +175,7 @@ def insert( pks.extend(res.primary_keys) except Exception as ex: logger.error( - "Failed to insert batch starting at entity: %s/%s", - str(i), - str(len(data)), + "Failed to insert batch starting at entity: %s/%s", str(i), str(len(data)) ) raise ex from ex @@ -370,8 +348,9 @@ def get( def delete( self, collection_name: str, - pks: Union[list, str, int], + pks: Optional[Union[list, str, int]] = None, timeout: Optional[float] = None, + filter: Optional[str] = "", **kwargs, ): """Delete entries in the collection by their pk. @@ -390,7 +369,8 @@ def delete( Args: pks (list, str, int): The pk's to delete. Depending on pk_field type it can be int - or str or alist of either. + or str or alist of either. Default to None. + filter(str, optional): A filter to use for the deletion. Defaults to empty. timeout (int, optional): Timeout to use, overides the client level assigned at init. Defaults to None. """ @@ -398,17 +378,22 @@ def delete( if isinstance(pks, (int, str)): pks = [pks] - if len(pks) == 0: - return [] - + expr = "" conn = self._get_connection() - try: - schema_dict = conn.describe_collection(collection_name, timeout=timeout, **kwargs) - except Exception as ex: - logger.error("Failed to describe collection: %s", collection_name) - raise ex from ex + if pks: + try: + schema_dict = conn.describe_collection(collection_name, timeout=timeout, **kwargs) + except Exception as ex: + logger.error("Failed to describe collection: %s", collection_name) + raise ex from ex + + expr = self._pack_pks_expr(schema_dict, pks) + + if filter: + if not isinstance(filter, str): + raise DataTypeNotMatchException(message=ExceptionsMessage.ExprType % type(filter)) + expr = f"({expr}) or ({filter})" if expr else filter - expr = self._pack_pks_expr(schema_dict, pks) ret_pks = [] try: res = conn.delete(collection_name, expr, timeout=timeout, **kwargs) @@ -600,8 +585,5 @@ def _load(self, collection_name: str, timeout: Optional[float] = None): try: conn.load_collection(collection_name, timeout=timeout) except MilvusException as ex: - logger.error( - "Failed to load collection: %s", - collection_name, - ) + logger.error("Failed to load collection: %s", collection_name) raise ex from ex