diff --git a/examples/hello_milvus_simple.py b/examples/hello_milvus_simple.py deleted file mode 100644 index c811c204e..000000000 --- a/examples/hello_milvus_simple.py +++ /dev/null @@ -1,63 +0,0 @@ -import time -import numpy as np -from pymilvus import ( - MilvusClient, -) - -fmt = "\n=== {:30} ===\n" -dim = 8 -collection_name = "hello_milvus" -milvus_client = MilvusClient("http://localhost:19530") -milvus_client.drop_collection(collection_name) -milvus_client.create_collection(collection_name, dim, consistency_level="Bounded", metric_type="L2", auto_id=True) - -print("collections:", milvus_client.list_collections()) -print(f"{collection_name} :", milvus_client.describe_collection(collection_name)) -rng = np.random.default_rng(seed=19530) - -rows = [ - {"vector": rng.random((1, dim))[0], "a": 1}, - {"vector": rng.random((1, dim))[0], "b": 1}, - {"vector": rng.random((1, dim))[0], "c": 1}, - {"vector": rng.random((1, dim))[0], "d": 1}, - {"vector": rng.random((1, dim))[0], "e": 1}, - {"vector": rng.random((1, dim))[0], "f": 1}, -] -print(fmt.format("Start inserting entities")) -pks = milvus_client.insert(collection_name, rows, progress_bar=True) -pks2 = milvus_client.insert(collection_name, {"vector": rng.random((1, dim))[0], "g": 1}) -pks.extend(pks2) -print(fmt.format("Start searching based on vector similarity")) - -print("len of pks:", len(pks), "first pk is :", pks[0]) - -print(f"get first primary key {pks[0]} from {collection_name}") -first_pk_data = milvus_client.get(collection_name, pks[0:1]) -print(f"data of primary key {pks[0]} is", first_pk_data) - -print(f"start to delete first 2 of primary keys in collection {collection_name}") -milvus_client.delete(collection_name, pks[0:2]) - -rng = np.random.default_rng(seed=19530) -vectors_to_search = rng.random((1, dim)) - -start_time = time.time() - -print(fmt.format(f"Start search with retrieve all fields.")) -result = milvus_client.search(collection_name, vectors_to_search, limit=3, output_fields=["pk", "a", "b"]) -end_time = time.time() - -for hits in result: - for hit in hits: - print(f"hit: {hit}") - -query_pks = [str(entry) for entry in pks[2:4]] -filter = f"id in [{','.join(query_pks)}]" - -# filter = f" id in [{','.join(pks[2:4])}]" -print(fmt.format(f"Start query({filter}) with retrieve all fields.")) - -filter_data = milvus_client.query(collection_name, filter) -print("filter_data:", filter_data) - -milvus_client.drop_collection(collection_name) diff --git a/examples/hello_milvus_simple2.py b/examples/hello_milvus_simple2.py deleted file mode 100644 index 81e3809de..000000000 --- a/examples/hello_milvus_simple2.py +++ /dev/null @@ -1,47 +0,0 @@ -import numpy as np -from pymilvus import MilvusClient, DataType - -dimension = 128 -collection_name = "books" -client = MilvusClient("http://localhost:19530") -client.drop_collection(collection_name) - -schema = client.create_schema(auto_id=True, enable_dynamic_field=True) -schema.add_field("id", DataType.INT64, is_primary=True) -schema.add_field("embeddings", DataType.FLOAT_VECTOR, dim=dimension) -schema.add_field("title", DataType.VARCHAR, max_length=64) - -index_param = client.prepare_index_params("embeddings", metric_type="L2") -client.create_collection_with_schema(collection_name, schema, index_param) - -info = client.describe_collection(collection_name) -print(f"{collection_name}'s info:{info}") - -rng = np.random.default_rng(seed=19530) -rows = [ - {"title": "The Catcher in the Rye", "embeddings": rng.random((1, dimension))[0], "a":1,}, - {"title": "Lord of the Flies", "embeddings": rng.random((1, dimension))[0], "b":2}, - {"title": "The Hobbit", "embeddings": rng.random((1, dimension))[0]}, - {"title": "The Outsiders", "embeddings": rng.random((1, dimension))[0]}, - {"title": "The Old Man and the Sea", "embeddings": rng.random((1, dimension))[0]}, -] - -client.insert(collection_name, rows) -client.insert(collection_name, - {"title": "The Great Gatsby", "embeddings": rng.random((1, dimension))[0]}) - -search_vec = rng.random((1, dimension)) -result = client.search(collection_name, search_vec, limit=3, output_fields=["title"]) -# we may get empty result -for i, hits in enumerate(result): - if not hits: - print(f"get empty results for search_vec[{i}]") - continue - for hit in hits: - print(f"hit: {hit}") - -# use strong consistency level ensure that we can see the data we inserted before -result = client.search(collection_name, search_vec, limit=3, output_fields=["title", "*"], consistency_level="Strong") -for hits in result: - for hit in hits: - print(f"hit: {hit}") diff --git a/examples/milvus_client/alias.py b/examples/milvus_client/alias.py new file mode 100644 index 000000000..0582337d8 --- /dev/null +++ b/examples/milvus_client/alias.py @@ -0,0 +1,67 @@ +import time +import numpy as np +from pymilvus import ( + MilvusClient, +) + +fmt = "\n=== {:30} ===\n" +dim = 8 +collection_name = "hello_milvus" +milvus_client = MilvusClient("http://localhost:19530") +milvus_client.drop_collection(collection_name) +milvus_client.create_collection(collection_name, dim, consistency_level="Strong", metric_type="L2", auto_id=True) + +collection_name2 = "hello_milvus2" +milvus_client.drop_collection(collection_name2) +milvus_client.create_collection(collection_name2, dim, consistency_level="Strong", metric_type="L2", auto_id=True) + + +print("collections:", milvus_client.list_collections()) + +desc_c1 = milvus_client.describe_collection(collection_name) +print(f"{collection_name} :", desc_c1) + +rng = np.random.default_rng(seed=19530) + +rows = [ + {"vector": rng.random((1, dim))[0], "a": 100}, + {"vector": rng.random((1, dim))[0], "b": 200}, + {"vector": rng.random((1, dim))[0], "c": 300}, +] + +print(fmt.format(f"Start inserting entities to {collection_name}")) +insert_result = milvus_client.insert(collection_name, rows) +print(insert_result) + +rows = [ + {"vector": rng.random((1, dim))[0], "d": 400}, + {"vector": rng.random((1, dim))[0], "e": 500}, + {"vector": rng.random((1, dim))[0], "f": 600}, +] + +print(fmt.format(f"Start inserting entities to {collection_name2}")) +insert_result2 = milvus_client.insert(collection_name2, rows) +print(insert_result2) + +alias = "alias_hello_milvus" +milvus_client.create_alias(collection_name, alias) + +assert milvus_client.describe_collection(alias) == milvus_client.describe_collection(collection_name) + +milvus_client.alter_alias(collection_name2, alias) +assert milvus_client.describe_collection(alias) == milvus_client.describe_collection(collection_name2) + +query_results = milvus_client.query(alias, filter= "f == 600") +print("results of query 'f == 600' is ") +for ret in query_results: + print(ret) + + +milvus_client.drop_alias(alias) +has_collection = milvus_client.has_collection(alias) +assert not has_collection +has_collection = milvus_client.has_collection(collection_name2) +assert has_collection + +milvus_client.drop_collection(collection_name) +milvus_client.drop_collection(collection_name2) diff --git a/examples/milvus_client/customize_schema.py b/examples/milvus_client/customize_schema.py new file mode 100644 index 000000000..2b0b54322 --- /dev/null +++ b/examples/milvus_client/customize_schema.py @@ -0,0 +1,70 @@ +import time +import numpy as np +from pymilvus import ( + MilvusClient, + DataType +) + +fmt = "\n=== {:30} ===\n" +dim = 8 +collection_name = "hello_milvus" +milvus_client = MilvusClient("http://localhost:19530") + +has_collection = milvus_client.has_collection(collection_name, timeout=5) +if has_collection: + milvus_client.drop_collection(collection_name) + +schema = milvus_client.create_schema(enable_dynamic_field=True) +schema.add_field("id", DataType.INT64, is_primary=True) +schema.add_field("embeddings", DataType.FLOAT_VECTOR, dim=dim) +schema.add_field("title", DataType.VARCHAR, max_length=64) + + +index_params = milvus_client.prepare_index_params() +index_params.add_index(field_name = "embeddings", metric_type="L2") +milvus_client.create_collection(collection_name, schema=schema, index_params=index_params, consistency_level="Strong") + +print(fmt.format(" all collections ")) +print(milvus_client.list_collections()) + +print(fmt.format(f"schema of collection {collection_name}")) +print(milvus_client.describe_collection(collection_name)) + +rng = np.random.default_rng(seed=19530) +rows = [ + {"id": 1, "embeddings": rng.random((1, dim))[0], "a": 100, "title": "t1"}, + {"id": 2, "embeddings": rng.random((1, dim))[0], "b": 200, "title": "t2"}, + {"id": 3, "embeddings": rng.random((1, dim))[0], "c": 300, "title": "t3"}, + {"id": 4, "embeddings": rng.random((1, dim))[0], "d": 400, "title": "t4"}, + {"id": 5, "embeddings": rng.random((1, dim))[0], "e": 500, "title": "t5"}, + {"id": 6, "embeddings": rng.random((1, dim))[0], "f": 600, "title": "t6"}, +] + +print(fmt.format("Start inserting entities")) +insert_result = milvus_client.insert(collection_name, rows) +print(fmt.format("Inserting entities done")) +print(insert_result) + + +print(fmt.format("Start load collection ")) +milvus_client.load_collection(collection_name) + +print(fmt.format("Start query by specifying primary keys")) +query_results = milvus_client.query(collection_name, ids=[2]) +print(query_results[0]) + +print(fmt.format("Start query by specifying filtering expression")) +query_results = milvus_client.query(collection_name, filter= "f == 600 or title == 't2'") +for ret in query_results: + print(ret) + +rng = np.random.default_rng(seed=19530) +vectors_to_search = rng.random((1, dim)) + +print(fmt.format(f"Start search with retrieve serveral fields.")) +result = milvus_client.search(collection_name, vectors_to_search, limit=3, output_fields=["pk", "a", "b"]) +for hits in result: + for hit in hits: + print(f"hit: {hit}") + +milvus_client.drop_collection(collection_name) diff --git a/examples/milvus_client/index.py b/examples/milvus_client/index.py new file mode 100644 index 000000000..aaf0ff867 --- /dev/null +++ b/examples/milvus_client/index.py @@ -0,0 +1,87 @@ +import time +import numpy as np +from pymilvus import ( + MilvusClient, + DataType +) + +fmt = "\n=== {:30} ===\n" +dim = 8 +collection_name = "hello_milvus" +milvus_client = MilvusClient("http://localhost:19530") + +has_collection = milvus_client.has_collection(collection_name, timeout=5) +if has_collection: + milvus_client.drop_collection(collection_name) + +schema = milvus_client.create_schema(enable_dynamic_field=True) +schema.add_field("id", DataType.INT64, is_primary=True) +schema.add_field("embeddings", DataType.FLOAT_VECTOR, dim=dim) +schema.add_field("title", DataType.VARCHAR, max_length=64) + +# collection is not loaded after creation +milvus_client.create_collection(collection_name, schema=schema, consistency_level="Strong") + +rng = np.random.default_rng(seed=19530) +rows = [ + {"id": 1, "embeddings": rng.random((1, dim))[0], "a": 100, "title": "t1"}, + {"id": 2, "embeddings": rng.random((1, dim))[0], "b": 200, "title": "t2"}, + {"id": 3, "embeddings": rng.random((1, dim))[0], "c": 300, "title": "t3"}, + {"id": 4, "embeddings": rng.random((1, dim))[0], "d": 400, "title": "t4"}, + {"id": 5, "embeddings": rng.random((1, dim))[0], "e": 500, "title": "t5"}, + {"id": 6, "embeddings": rng.random((1, dim))[0], "f": 600, "title": "t6"}, +] + +print(fmt.format("Start inserting entities")) +insert_result = milvus_client.insert(collection_name, rows) +print(fmt.format("Inserting entities done")) +print(insert_result) + +index_params = milvus_client.prepare_index_params() +index_params.add_index(field_name = "embeddings", metric_type="L2") +index_params.add_index(field_name = "title", index_type = "TRIE", index_name="my_trie") + +print(fmt.format("Start create index")) +milvus_client.create_index(collection_name, index_params) + + +index_names = milvus_client.list_indexes(collection_name) +print(f"index names for {collection_name}:", index_names) +for index_name in index_names: + index_info = milvus_client.describe_index(collection_name, index_name=index_name) + print(f"index info for index {index_name} is:", index_info) + +print(fmt.format("Start load collection")) +milvus_client.load_collection(collection_name) + +print(fmt.format("Start query by specifying primary keys")) +query_results = milvus_client.query(collection_name, ids=[2]) +print(query_results[0]) + +print(fmt.format("Start query by specifying filtering expression")) +query_results = milvus_client.query(collection_name, filter= "f == 600 or title == 't2'") +for ret in query_results: + print(ret) + +vectors_to_search = rng.random((1, dim)) +print(fmt.format(f"Start search with retrieve serveral fields.")) +result = milvus_client.search(collection_name, vectors_to_search, limit=3, output_fields=["title"]) +for hits in result: + for hit in hits: + print(f"hit: {hit}") + + + +field_index_names = milvus_client.list_indexes(collection_name, field_name = "embeddings") +print(f"index names for {collection_name}`s field embeddings:", field_index_names) + +try: + milvus_client.drop_index(collection_name, "my_trie") +except Exception as e: + print(f"cacthed {e}") + +milvus_client.release_collection(collection_name) + +milvus_client.drop_index(collection_name, "my_trie") + +milvus_client.drop_collection(collection_name) diff --git a/examples/milvus_client/partition.py b/examples/milvus_client/partition.py new file mode 100644 index 000000000..7466c034a --- /dev/null +++ b/examples/milvus_client/partition.py @@ -0,0 +1,85 @@ +import time +import numpy as np +from pymilvus import ( + MilvusClient, +) + +fmt = "\n=== {:30} ===\n" +dim = 8 +collection_name = "hello_milvus" +milvus_client = MilvusClient("http://localhost:19530") + +has_collection = milvus_client.has_collection(collection_name, timeout=5) +if has_collection: + milvus_client.drop_collection(collection_name) +milvus_client.create_collection(collection_name, dim, consistency_level="Strong", metric_type="L2") + +print(fmt.format(" all collections ")) +print(milvus_client.list_collections()) + +print(fmt.format(f"schema of collection {collection_name}")) +print(milvus_client.describe_collection(collection_name)) + +rng = np.random.default_rng(seed=19530) + +milvus_client.create_partition(collection_name, partition_name = "p1") +milvus_client.insert(collection_name, {"id": 1, "vector": rng.random((1, dim))[0], "a": 100}, partition_name = "p1") +milvus_client.insert(collection_name, {"id": 2, "vector": rng.random((1, dim))[0], "b": 200}, partition_name = "p1") +milvus_client.insert(collection_name, {"id": 3, "vector": rng.random((1, dim))[0], "c": 300}, partition_name = "p1") + +milvus_client.create_partition(collection_name, partition_name = "p2") +milvus_client.insert(collection_name, {"id": 4, "vector": rng.random((1, dim))[0], "e": 400}, partition_name = "p2") +milvus_client.insert(collection_name, {"id": 5, "vector": rng.random((1, dim))[0], "f": 500}, partition_name = "p2") +milvus_client.insert(collection_name, {"id": 6, "vector": rng.random((1, dim))[0], "g": 600}, partition_name = "p2") + +has_p1 = milvus_client.has_partition(collection_name, "p1") +print("has partition p1", has_p1) + +has_p3 = milvus_client.has_partition(collection_name, "p3") +print("has partition p3", has_p3) + +partitions = milvus_client.list_partitions(collection_name) +print("partitions:", partitions) + +milvus_client.release_collection(collection_name) +milvus_client.load_partitions(collection_name, partition_names =["p1", "p2"]) + +print(fmt.format("Start search in partiton p1")) +vectors_to_search = rng.random((1, dim)) +result = milvus_client.search(collection_name, vectors_to_search, limit=3, output_fields=["pk", "a", "b"], partition_names = ["p1"]) +for hits in result: + for hit in hits: + print(f"hit: {hit}") + +milvus_client.release_partitions(collection_name, partition_names = ["p1"]) +milvus_client.drop_partition(collection_name, partition_name = "p1", timeout = 2.0) +print("successfully drop partition p1") + +try: + milvus_client.drop_partition(collection_name, partition_name = "p2", timeout = 2.0) +except Exception as e: + print(f"cacthed {e}") + +has_p1 = milvus_client.has_partition(collection_name, "p1") +print("has partition of p1:", has_p1) + +print(fmt.format("Start query by specifying primary keys")) +query_results = milvus_client.query(collection_name, ids=[2]) +assert len(query_results) == 0 + +print(fmt.format("Start query by specifying primary keys")) +query_results = milvus_client.query(collection_name, ids=[4]) +print(query_results[0]) + +print(fmt.format("Start query by specifying filtering expression")) +query_results = milvus_client.query(collection_name, filter= "f == 500") +for ret in query_results: + print(ret) + +print(fmt.format(f"Start search with retrieve serveral fields.")) +result = milvus_client.search(collection_name, vectors_to_search, limit=3, output_fields=["pk", "a", "b"]) +for hits in result: + for hit in hits: + print(f"hit: {hit}") + +milvus_client.drop_collection(collection_name) diff --git a/examples/milvus_client/rbac.py b/examples/milvus_client/rbac.py new file mode 100644 index 000000000..f7ecc1057 --- /dev/null +++ b/examples/milvus_client/rbac.py @@ -0,0 +1,104 @@ +import time +import numpy as np +from pymilvus import ( + MilvusClient, +) + +super_user = "root" +super_password = "Milvus" + +fmt = "\n=== {:30} ===\n" +dim = 8 + +collection_name = "hello_milvus" +milvus_client = MilvusClient("http://localhost:19530", user=super_user, password=super_password) + +milvus_client.drop_user("user1") +milvus_client.drop_user("user2") +milvus_client.drop_user("user3") + +milvus_client.create_user("user1", "password1") +milvus_client.create_user("user2", "password2") +milvus_client.create_user("user3", "password3") + +users = milvus_client.list_users() +print("users:", users) + +milvus_client.drop_user("user3") + +users = milvus_client.list_users() +print("after drop opeartion, users:", users) + + +db_rw_privileges = [ + {"object_type": "Global", "object_name": "*", "privilege": "CreateCollection"}, + {"object_type": "Global", "object_name": "*", "privilege": "DropCollection"}, + {"object_type": "Global", "object_name": "*", "privilege": "DescribeCollection"}, + {"object_type": "Global", "object_name": "*", "privilege": "ShowCollections"}, + {"object_type": "Collection", "object_name": "*", "privilege": "Search"}, + {"object_type": "Collection", "object_name": "*", "privilege": "Query"}, + {"object_type": "Collection", "object_name": "*", "privilege": "CreateIndex"}, + {"object_type": "Collection", "object_name": "*", "privilege": "Load"}, + {"object_type": "Collection", "object_name": "*", "privilege": "Release"}, + {"object_type": "Collection", "object_name": "*", "privilege": "Delete"}, + {"object_type": "Collection", "object_name": "*", "privilege": "Insert"}, +] + +db_ro_privileges = [ + {"object_type": "Global", "object_name": "*", "privilege": "DescribeCollection"}, + {"object_type": "Global", "object_name": "*", "privilege": "ShowCollections"}, + {"object_type": "Collection", "object_name": "*", "privilege": "Search"}, + {"object_type": "Collection", "object_name": "*", "privilege": "Query"}, +] + +role_db_rw = "db_rw" +role_db_ro = "db_ro" + +current_roles = milvus_client.list_roles() +print("current roles:", current_roles) + +for role in [role_db_rw, role_db_ro]: + if role in current_roles: + privileges = milvus_client.describe_role(role) + for item in privileges: + milvus_client.revoke_privilege(role, item["object_type"], item["privilege"], item["object_name"]) + + milvus_client.drop_role(role) + + +milvus_client.create_role(role_db_rw) +for item in db_rw_privileges: + milvus_client.grant_privilege(role_db_rw, item["object_type"], item["privilege"], item["object_name"]) + + +milvus_client.create_role(role_db_ro) +for item in db_ro_privileges: + milvus_client.grant_privilege(role_db_ro, item["object_type"], item["privilege"], item["object_name"]) + + +roles = milvus_client.list_roles() +print("roles:", roles) +for role in roles: + privileges = milvus_client.describe_role(role) + print(f"privileges for {role}:", privileges) + + +user1_info = milvus_client.describe_user("user1") +print("user info for user1:", user1_info) +print(f"grant {role_db_rw} to user1") +milvus_client.grant_role("user1", role_db_rw) +print("user info for user1:", user1_info) + +milvus_client.grant_role("user2", role_db_ro) +milvus_client.grant_role("user2", role_db_rw) + +user2_info = milvus_client.describe_user("user2") +print("user info for user2:", user2_info) +print(f"revoke {role} from user2") +milvus_client.revoke_role("user2", role_db_rw) +user2_info = milvus_client.describe_user("user2") +print("user info for user2:", user2_info) + +user3_info = milvus_client.describe_user("user3") +print("user info for user3:", user3_info) + diff --git a/examples/milvus_client/simple.py b/examples/milvus_client/simple.py new file mode 100644 index 000000000..4e1006482 --- /dev/null +++ b/examples/milvus_client/simple.py @@ -0,0 +1,73 @@ +import time +import numpy as np +from pymilvus import ( + MilvusClient, +) + +fmt = "\n=== {:30} ===\n" +dim = 8 +collection_name = "hello_milvus" +milvus_client = MilvusClient("http://localhost:19530") + +has_collection = milvus_client.has_collection(collection_name, timeout=5) +if has_collection: + milvus_client.drop_collection(collection_name) +milvus_client.create_collection(collection_name, dim, consistency_level="Strong", metric_type="L2") + +print(fmt.format(" all collections ")) +print(milvus_client.list_collections()) + +print(fmt.format(f"schema of collection {collection_name}")) +print(milvus_client.describe_collection(collection_name)) + +rng = np.random.default_rng(seed=19530) +rows = [ + {"id": 1, "vector": rng.random((1, dim))[0], "a": 100}, + {"id": 2, "vector": rng.random((1, dim))[0], "b": 200}, + {"id": 3, "vector": rng.random((1, dim))[0], "c": 300}, + {"id": 4, "vector": rng.random((1, dim))[0], "d": 400}, + {"id": 5, "vector": rng.random((1, dim))[0], "e": 500}, + {"id": 6, "vector": rng.random((1, dim))[0], "f": 600}, +] + +print(fmt.format("Start inserting entities")) +insert_result = milvus_client.insert(collection_name, rows, progress_bar=True) +print(fmt.format("Inserting entities done")) +print(insert_result) + +print(fmt.format("Start query by specifying primary keys")) +query_results = milvus_client.query(collection_name, ids=[2]) +print(query_results[0]) + +upsert_ret = milvus_client.upsert(collection_name, {"id": 2 , "vector": rng.random((1, dim))[0], "g": 100}) +print(upsert_ret) + +print(fmt.format("Start query by specifying primary keys")) +query_results = milvus_client.query(collection_name, ids=[2]) +print(query_results[0]) + + +print(fmt.format("Start query by specifying filtering expression")) +query_results = milvus_client.query(collection_name, filter= "f == 600") +for ret in query_results: + print(ret) + + +print(f"start to delete by specifying filter in collection {collection_name}") +delete_result = milvus_client.delete(collection_name, ids=[6]) +print(delete_result) + +print(fmt.format("Start query by specifying filtering expression")) +query_results = milvus_client.query(collection_name, filter= "f == 600") +assert len(query_results) == 0 + +rng = np.random.default_rng(seed=19530) +vectors_to_search = rng.random((1, dim)) + +print(fmt.format(f"Start search with retrieve serveral fields.")) +result = milvus_client.search(collection_name, vectors_to_search, limit=3, output_fields=["pk", "a", "b"]) +for hits in result: + for hit in hits: + print(f"hit: {hit}") + +milvus_client.drop_collection(collection_name) diff --git a/examples/milvus_client/simple_auto_id.py b/examples/milvus_client/simple_auto_id.py new file mode 100644 index 000000000..873303df1 --- /dev/null +++ b/examples/milvus_client/simple_auto_id.py @@ -0,0 +1,60 @@ +import time +import numpy as np +from pymilvus import ( + MilvusClient, +) + +fmt = "\n=== {:30} ===\n" +dim = 8 +collection_name = "hello_milvus" +milvus_client = MilvusClient("http://localhost:19530") + +has_collection = milvus_client.has_collection(collection_name, timeout=5) +if has_collection: + milvus_client.drop_collection(collection_name) +milvus_client.create_collection(collection_name, dim, consistency_level="Strong", metric_type="L2", auto_id=True) + +print(fmt.format(" all collections ")) +print(milvus_client.list_collections()) + +print(fmt.format(f"schema of collection {collection_name}")) +print(milvus_client.describe_collection(collection_name)) + +rng = np.random.default_rng(seed=19530) +rows = [ + {"vector": rng.random((1, dim))[0], "a": 100}, + {"vector": rng.random((1, dim))[0], "b": 200}, + {"vector": rng.random((1, dim))[0], "c": 300}, + {"vector": rng.random((1, dim))[0], "d": 400}, + {"vector": rng.random((1, dim))[0], "e": 500}, + {"vector": rng.random((1, dim))[0], "f": 600}, +] + +print(fmt.format("Start inserting entities")) +insert_result = milvus_client.insert(collection_name, rows, progress_bar=True) +print("insert done:", insert_result) + +print(fmt.format("Start query by specifying filter")) +query_results = milvus_client.query(collection_name, filter= "f == 600") +for ret in query_results: + print(ret) + + +print(f"start to delete by specifying filter in collection {collection_name}") +delete_result = milvus_client.delete(collection_name, filter = "f == 600") +print(delete_result) + +print(fmt.format("Start query by specifying filtering expression")) +query_results = milvus_client.query(collection_name, filter= "f == 600") +assert len(query_results) == 0 + +rng = np.random.default_rng(seed=19530) +vectors_to_search = rng.random((1, dim)) + +print(fmt.format(f"Start search with retrieve serveral fields.")) +result = milvus_client.search(collection_name, vectors_to_search, limit=3, output_fields=["pk", "a", "b"]) +for hits in result: + for hit in hits: + print(f"hit: {hit}") + +milvus_client.drop_collection(collection_name) diff --git a/pymilvus/client/abstract.py b/pymilvus/client/abstract.py index f278bfddc..78235b8bb 100644 --- a/pymilvus/client/abstract.py +++ b/pymilvus/client/abstract.py @@ -54,12 +54,16 @@ def __pack(self, raw: Any): self.params[type_param.key] = type_param.value if type_param.key in ["dim"]: self.params[type_param.key] = int(type_param.value) - if ( - type_param.key in [Config.MaxVarCharLengthKey] - and raw.data_type == DataType.VARCHAR + if type_param.key in [Config.MaxVarCharLengthKey] and raw.data_type in ( + DataType.VARCHAR, + DataType.ARRAY, ): self.params[type_param.key] = int(type_param.value) + # TO-DO: use constants defined in orm + if type_param.key in ["max_capacity"] and raw.data_type == DataType.ARRAY: + self.params[type_param.key] = int(type_param.value) + index_dict = {} for index_param in raw.index_params: if index_param.key == "params": diff --git a/pymilvus/client/grpc_handler.py b/pymilvus/client/grpc_handler.py index 41d60a43c..8190cfc7f 100644 --- a/pymilvus/client/grpc_handler.py +++ b/pymilvus/client/grpc_handler.py @@ -337,6 +337,9 @@ def has_collection(self, collection_name: str, timeout: Optional[float] = None, ): return False + if reply.status.error_code == common_pb2.CollectionNotExists: + return False + if is_successful(reply.status): return True @@ -386,7 +389,7 @@ def rename_collections( @retry_on_rpc_failure() def create_partition( - self, collection_name: str, partition_name: str, timeout: Optional[float] = None + self, collection_name: str, partition_name: str, timeout: Optional[float] = None, **kwargs ): check_pass_param(collection_name=collection_name, partition_name=partition_name) request = Prepare.create_partition_request(collection_name, partition_name) @@ -396,7 +399,7 @@ def create_partition( @retry_on_rpc_failure() def drop_partition( - self, collection_name: str, partition_name: str, timeout: Optional[float] = None + self, collection_name: str, partition_name: str, timeout: Optional[float] = None, **kwargs ): check_pass_param(collection_name=collection_name, partition_name=partition_name) request = Prepare.drop_partition_request(collection_name, partition_name) @@ -434,7 +437,7 @@ def get_partition_info( return info_dict @retry_on_rpc_failure() - def list_partitions(self, collection_name: str, timeout: Optional[float] = None): + def list_partitions(self, collection_name: str, timeout: Optional[float] = None, **kwargs): check_pass_param(collection_name=collection_name) request = Prepare.show_partitions_request(collection_name) @@ -1123,7 +1126,7 @@ def can_loop(t: int) -> bool: ) @retry_on_rpc_failure() - def release_collection(self, collection_name: str, timeout: Optional[float] = None): + def release_collection(self, collection_name: str, timeout: Optional[float] = None, **kwargs): check_pass_param(collection_name=collection_name) request = Prepare.release_collection("", collection_name) rf = self._stub.ReleaseCollection.future(request, timeout=timeout) @@ -1258,7 +1261,11 @@ def load_partitions_progress( @retry_on_rpc_failure() def release_partitions( - self, collection_name: str, partition_names: List[str], timeout: Optional[float] = None + self, + collection_name: str, + partition_names: List[str], + timeout: Optional[float] = None, + **kwargs, ): check_pass_param(collection_name=collection_name, partition_name_array=partition_names) request = Prepare.release_partitions("", collection_name, partition_names) @@ -1367,7 +1374,7 @@ def drop_index( timeout: Optional[float] = None, **kwargs, ): - check_pass_param(collection_name=collection_name, field_name=field_name) + check_pass_param(collection_name=collection_name) request = Prepare.drop_index_request(collection_name, field_name, index_name) future = self._stub.DropIndex.future(request, timeout=timeout) response = future.result() diff --git a/pymilvus/client/types.py b/pymilvus/client/types.py index 22a50bf60..6a7b1f0b1 100644 --- a/pymilvus/client/types.py +++ b/pymilvus/client/types.py @@ -627,6 +627,17 @@ def grantor_name(self): def privilege(self): return self._privilege + def __iter__(self): + yield "object_type", self.object + yield "object_name", self.object_name + if self.db_name: + yield "db_name", self.db_name + + yield "role_name", self.role_name + yield "privilege", self.privilege + if self.grantor_name: + yield "grantor_name", self.grantor_name + class GrantInfo: """ diff --git a/pymilvus/exceptions.py b/pymilvus/exceptions.py index 28e8c9a83..47e774c9d 100644 --- a/pymilvus/exceptions.py +++ b/pymilvus/exceptions.py @@ -213,3 +213,6 @@ class ExceptionsMessage: AmbiguousDeleteFilterParam = ( "Ambiguous filter parameter, only one deletion condition can be specified." ) + AmbiguousQueryFilterParam = ( + "Ambiguous parameter, either ids or filter should be specified, cannot support both." + ) diff --git a/pymilvus/grpc_gen/milvus-proto b/pymilvus/grpc_gen/milvus-proto index b5442d755..39bce6abb 160000 --- a/pymilvus/grpc_gen/milvus-proto +++ b/pymilvus/grpc_gen/milvus-proto @@ -1 +1 @@ -Subproject commit b5442d755fa41a6684e381674e5e129ec483fc6f +Subproject commit 39bce6abb18ff41132895f33dbdc58f57515f80b diff --git a/pymilvus/grpc_gen/schema_pb2.py b/pymilvus/grpc_gen/schema_pb2.py index fcf5a934d..1159da1c3 100644 --- a/pymilvus/grpc_gen/schema_pb2.py +++ b/pymilvus/grpc_gen/schema_pb2.py @@ -16,7 +16,7 @@ from google.protobuf import descriptor_pb2 as google_dot_protobuf_dot_descriptor__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0cschema.proto\x12\x13milvus.proto.schema\x1a\x0c\x63ommon.proto\x1a google/protobuf/descriptor.proto\"\xd7\x03\n\x0b\x46ieldSchema\x12\x0f\n\x07\x66ieldID\x18\x01 \x01(\x03\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x16\n\x0eis_primary_key\x18\x03 \x01(\x08\x12\x13\n\x0b\x64\x65scription\x18\x04 \x01(\t\x12\x30\n\tdata_type\x18\x05 \x01(\x0e\x32\x1d.milvus.proto.schema.DataType\x12\x36\n\x0btype_params\x18\x06 \x03(\x0b\x32!.milvus.proto.common.KeyValuePair\x12\x37\n\x0cindex_params\x18\x07 \x03(\x0b\x32!.milvus.proto.common.KeyValuePair\x12\x0e\n\x06\x61utoID\x18\x08 \x01(\x08\x12.\n\x05state\x18\t \x01(\x0e\x32\x1f.milvus.proto.schema.FieldState\x12\x33\n\x0c\x65lement_type\x18\n \x01(\x0e\x32\x1d.milvus.proto.schema.DataType\x12\x36\n\rdefault_value\x18\x0b \x01(\x0b\x32\x1f.milvus.proto.schema.ValueField\x12\x12\n\nis_dynamic\x18\x0c \x01(\x08\x12\x18\n\x10is_partition_key\x18\r \x01(\x08\"\x99\x01\n\x10\x43ollectionSchema\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x13\n\x0b\x64\x65scription\x18\x02 \x01(\t\x12\x12\n\x06\x61utoID\x18\x03 \x01(\x08\x42\x02\x18\x01\x12\x30\n\x06\x66ields\x18\x04 \x03(\x0b\x32 .milvus.proto.schema.FieldSchema\x12\x1c\n\x14\x65nable_dynamic_field\x18\x05 \x01(\x08\"\x19\n\tBoolArray\x12\x0c\n\x04\x64\x61ta\x18\x01 \x03(\x08\"\x18\n\x08IntArray\x12\x0c\n\x04\x64\x61ta\x18\x01 \x03(\x05\"\x19\n\tLongArray\x12\x0c\n\x04\x64\x61ta\x18\x01 \x03(\x03\"\x1a\n\nFloatArray\x12\x0c\n\x04\x64\x61ta\x18\x01 \x03(\x02\"\x1b\n\x0b\x44oubleArray\x12\x0c\n\x04\x64\x61ta\x18\x01 \x03(\x01\"\x1a\n\nBytesArray\x12\x0c\n\x04\x64\x61ta\x18\x01 \x03(\x0c\"\x1b\n\x0bStringArray\x12\x0c\n\x04\x64\x61ta\x18\x01 \x03(\t\"q\n\nArrayArray\x12.\n\x04\x64\x61ta\x18\x01 \x03(\x0b\x32 .milvus.proto.schema.ScalarField\x12\x33\n\x0c\x65lement_type\x18\x02 \x01(\x0e\x32\x1d.milvus.proto.schema.DataType\"\x19\n\tJSONArray\x12\x0c\n\x04\x64\x61ta\x18\x01 \x03(\x0c\"\xac\x01\n\nValueField\x12\x13\n\tbool_data\x18\x01 \x01(\x08H\x00\x12\x12\n\x08int_data\x18\x02 \x01(\x05H\x00\x12\x13\n\tlong_data\x18\x03 \x01(\x03H\x00\x12\x14\n\nfloat_data\x18\x04 \x01(\x02H\x00\x12\x15\n\x0b\x64ouble_data\x18\x05 \x01(\x01H\x00\x12\x15\n\x0bstring_data\x18\x06 \x01(\tH\x00\x12\x14\n\nbytes_data\x18\x07 \x01(\x0cH\x00\x42\x06\n\x04\x64\x61ta\"\xfe\x03\n\x0bScalarField\x12\x33\n\tbool_data\x18\x01 \x01(\x0b\x32\x1e.milvus.proto.schema.BoolArrayH\x00\x12\x31\n\x08int_data\x18\x02 \x01(\x0b\x32\x1d.milvus.proto.schema.IntArrayH\x00\x12\x33\n\tlong_data\x18\x03 \x01(\x0b\x32\x1e.milvus.proto.schema.LongArrayH\x00\x12\x35\n\nfloat_data\x18\x04 \x01(\x0b\x32\x1f.milvus.proto.schema.FloatArrayH\x00\x12\x37\n\x0b\x64ouble_data\x18\x05 \x01(\x0b\x32 .milvus.proto.schema.DoubleArrayH\x00\x12\x37\n\x0bstring_data\x18\x06 \x01(\x0b\x32 .milvus.proto.schema.StringArrayH\x00\x12\x35\n\nbytes_data\x18\x07 \x01(\x0b\x32\x1f.milvus.proto.schema.BytesArrayH\x00\x12\x35\n\narray_data\x18\x08 \x01(\x0b\x32\x1f.milvus.proto.schema.ArrayArrayH\x00\x12\x33\n\tjson_data\x18\t \x01(\x0b\x32\x1e.milvus.proto.schema.JSONArrayH\x00\x42\x06\n\x04\x64\x61ta\"\xa9\x01\n\x0bVectorField\x12\x0b\n\x03\x64im\x18\x01 \x01(\x03\x12\x37\n\x0c\x66loat_vector\x18\x02 \x01(\x0b\x32\x1f.milvus.proto.schema.FloatArrayH\x00\x12\x17\n\rbinary_vector\x18\x03 \x01(\x0cH\x00\x12\x18\n\x0e\x66loat16_vector\x18\x04 \x01(\x0cH\x00\x12\x19\n\x0f\x62\x66loat16_vector\x18\x05 \x01(\x0cH\x00\x42\x06\n\x04\x64\x61ta\"\xe5\x01\n\tFieldData\x12+\n\x04type\x18\x01 \x01(\x0e\x32\x1d.milvus.proto.schema.DataType\x12\x12\n\nfield_name\x18\x02 \x01(\t\x12\x33\n\x07scalars\x18\x03 \x01(\x0b\x32 .milvus.proto.schema.ScalarFieldH\x00\x12\x33\n\x07vectors\x18\x04 \x01(\x0b\x32 .milvus.proto.schema.VectorFieldH\x00\x12\x10\n\x08\x66ield_id\x18\x05 \x01(\x03\x12\x12\n\nis_dynamic\x18\x06 \x01(\x08\x42\x07\n\x05\x66ield\"w\n\x03IDs\x12\x30\n\x06int_id\x18\x01 \x01(\x0b\x32\x1e.milvus.proto.schema.LongArrayH\x00\x12\x32\n\x06str_id\x18\x02 \x01(\x0b\x32 .milvus.proto.schema.StringArrayH\x00\x42\n\n\x08id_field\"\x86\x02\n\x10SearchResultData\x12\x13\n\x0bnum_queries\x18\x01 \x01(\x03\x12\r\n\x05top_k\x18\x02 \x01(\x03\x12\x33\n\x0b\x66ields_data\x18\x03 \x03(\x0b\x32\x1e.milvus.proto.schema.FieldData\x12\x0e\n\x06scores\x18\x04 \x03(\x02\x12%\n\x03ids\x18\x05 \x01(\x0b\x32\x18.milvus.proto.schema.IDs\x12\r\n\x05topks\x18\x06 \x03(\x03\x12\x15\n\routput_fields\x18\x07 \x03(\t\x12<\n\x14group_by_field_value\x18\x08 \x01(\x0b\x32\x1e.milvus.proto.schema.FieldData\"Y\n\x14VectorClusteringInfo\x12\r\n\x05\x66ield\x18\x01 \x01(\t\x12\x32\n\x08\x63\x65ntroid\x18\x02 \x01(\x0b\x32 .milvus.proto.schema.VectorField\"%\n\x14ScalarClusteringInfo\x12\r\n\x05\x66ield\x18\x01 \x01(\t\"\xa8\x01\n\x0e\x43lusteringInfo\x12J\n\x17vector_clustering_infos\x18\x01 \x03(\x0b\x32).milvus.proto.schema.VectorClusteringInfo\x12J\n\x17scalar_clustering_infos\x18\x02 \x03(\x0b\x32).milvus.proto.schema.ScalarClusteringInfo*\xd8\x01\n\x08\x44\x61taType\x12\x08\n\x04None\x10\x00\x12\x08\n\x04\x42ool\x10\x01\x12\x08\n\x04Int8\x10\x02\x12\t\n\x05Int16\x10\x03\x12\t\n\x05Int32\x10\x04\x12\t\n\x05Int64\x10\x05\x12\t\n\x05\x46loat\x10\n\x12\n\n\x06\x44ouble\x10\x0b\x12\n\n\x06String\x10\x14\x12\x0b\n\x07VarChar\x10\x15\x12\t\n\x05\x41rray\x10\x16\x12\x08\n\x04JSON\x10\x17\x12\x10\n\x0c\x42inaryVector\x10\x64\x12\x0f\n\x0b\x46loatVector\x10\x65\x12\x11\n\rFloat16Vector\x10\x66\x12\x12\n\x0e\x42\x46loat16Vector\x10g*V\n\nFieldState\x12\x10\n\x0c\x46ieldCreated\x10\x00\x12\x11\n\rFieldCreating\x10\x01\x12\x11\n\rFieldDropping\x10\x02\x12\x10\n\x0c\x46ieldDropped\x10\x03\x42m\n\x0eio.milvus.grpcB\x0bSchemaProtoP\x01Z4github.com/milvus-io/milvus-proto/go-api/v2/schemapb\xa0\x01\x01\xaa\x02\x12Milvus.Client.Grpcb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0cschema.proto\x12\x13milvus.proto.schema\x1a\x0c\x63ommon.proto\x1a google/protobuf/descriptor.proto\"\xf2\x03\n\x0b\x46ieldSchema\x12\x0f\n\x07\x66ieldID\x18\x01 \x01(\x03\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x16\n\x0eis_primary_key\x18\x03 \x01(\x08\x12\x13\n\x0b\x64\x65scription\x18\x04 \x01(\t\x12\x30\n\tdata_type\x18\x05 \x01(\x0e\x32\x1d.milvus.proto.schema.DataType\x12\x36\n\x0btype_params\x18\x06 \x03(\x0b\x32!.milvus.proto.common.KeyValuePair\x12\x37\n\x0cindex_params\x18\x07 \x03(\x0b\x32!.milvus.proto.common.KeyValuePair\x12\x0e\n\x06\x61utoID\x18\x08 \x01(\x08\x12.\n\x05state\x18\t \x01(\x0e\x32\x1f.milvus.proto.schema.FieldState\x12\x33\n\x0c\x65lement_type\x18\n \x01(\x0e\x32\x1d.milvus.proto.schema.DataType\x12\x36\n\rdefault_value\x18\x0b \x01(\x0b\x32\x1f.milvus.proto.schema.ValueField\x12\x12\n\nis_dynamic\x18\x0c \x01(\x08\x12\x18\n\x10is_partition_key\x18\r \x01(\x08\x12\x19\n\x11is_clustering_key\x18\x0e \x01(\x08\"\x99\x01\n\x10\x43ollectionSchema\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x13\n\x0b\x64\x65scription\x18\x02 \x01(\t\x12\x12\n\x06\x61utoID\x18\x03 \x01(\x08\x42\x02\x18\x01\x12\x30\n\x06\x66ields\x18\x04 \x03(\x0b\x32 .milvus.proto.schema.FieldSchema\x12\x1c\n\x14\x65nable_dynamic_field\x18\x05 \x01(\x08\"\x19\n\tBoolArray\x12\x0c\n\x04\x64\x61ta\x18\x01 \x03(\x08\"\x18\n\x08IntArray\x12\x0c\n\x04\x64\x61ta\x18\x01 \x03(\x05\"\x19\n\tLongArray\x12\x0c\n\x04\x64\x61ta\x18\x01 \x03(\x03\"\x1a\n\nFloatArray\x12\x0c\n\x04\x64\x61ta\x18\x01 \x03(\x02\"\x1b\n\x0b\x44oubleArray\x12\x0c\n\x04\x64\x61ta\x18\x01 \x03(\x01\"\x1a\n\nBytesArray\x12\x0c\n\x04\x64\x61ta\x18\x01 \x03(\x0c\"\x1b\n\x0bStringArray\x12\x0c\n\x04\x64\x61ta\x18\x01 \x03(\t\"q\n\nArrayArray\x12.\n\x04\x64\x61ta\x18\x01 \x03(\x0b\x32 .milvus.proto.schema.ScalarField\x12\x33\n\x0c\x65lement_type\x18\x02 \x01(\x0e\x32\x1d.milvus.proto.schema.DataType\"\x19\n\tJSONArray\x12\x0c\n\x04\x64\x61ta\x18\x01 \x03(\x0c\"\xac\x01\n\nValueField\x12\x13\n\tbool_data\x18\x01 \x01(\x08H\x00\x12\x12\n\x08int_data\x18\x02 \x01(\x05H\x00\x12\x13\n\tlong_data\x18\x03 \x01(\x03H\x00\x12\x14\n\nfloat_data\x18\x04 \x01(\x02H\x00\x12\x15\n\x0b\x64ouble_data\x18\x05 \x01(\x01H\x00\x12\x15\n\x0bstring_data\x18\x06 \x01(\tH\x00\x12\x14\n\nbytes_data\x18\x07 \x01(\x0cH\x00\x42\x06\n\x04\x64\x61ta\"\xfe\x03\n\x0bScalarField\x12\x33\n\tbool_data\x18\x01 \x01(\x0b\x32\x1e.milvus.proto.schema.BoolArrayH\x00\x12\x31\n\x08int_data\x18\x02 \x01(\x0b\x32\x1d.milvus.proto.schema.IntArrayH\x00\x12\x33\n\tlong_data\x18\x03 \x01(\x0b\x32\x1e.milvus.proto.schema.LongArrayH\x00\x12\x35\n\nfloat_data\x18\x04 \x01(\x0b\x32\x1f.milvus.proto.schema.FloatArrayH\x00\x12\x37\n\x0b\x64ouble_data\x18\x05 \x01(\x0b\x32 .milvus.proto.schema.DoubleArrayH\x00\x12\x37\n\x0bstring_data\x18\x06 \x01(\x0b\x32 .milvus.proto.schema.StringArrayH\x00\x12\x35\n\nbytes_data\x18\x07 \x01(\x0b\x32\x1f.milvus.proto.schema.BytesArrayH\x00\x12\x35\n\narray_data\x18\x08 \x01(\x0b\x32\x1f.milvus.proto.schema.ArrayArrayH\x00\x12\x33\n\tjson_data\x18\t \x01(\x0b\x32\x1e.milvus.proto.schema.JSONArrayH\x00\x42\x06\n\x04\x64\x61ta\"\xa9\x01\n\x0bVectorField\x12\x0b\n\x03\x64im\x18\x01 \x01(\x03\x12\x37\n\x0c\x66loat_vector\x18\x02 \x01(\x0b\x32\x1f.milvus.proto.schema.FloatArrayH\x00\x12\x17\n\rbinary_vector\x18\x03 \x01(\x0cH\x00\x12\x18\n\x0e\x66loat16_vector\x18\x04 \x01(\x0cH\x00\x12\x19\n\x0f\x62\x66loat16_vector\x18\x05 \x01(\x0cH\x00\x42\x06\n\x04\x64\x61ta\"\xe5\x01\n\tFieldData\x12+\n\x04type\x18\x01 \x01(\x0e\x32\x1d.milvus.proto.schema.DataType\x12\x12\n\nfield_name\x18\x02 \x01(\t\x12\x33\n\x07scalars\x18\x03 \x01(\x0b\x32 .milvus.proto.schema.ScalarFieldH\x00\x12\x33\n\x07vectors\x18\x04 \x01(\x0b\x32 .milvus.proto.schema.VectorFieldH\x00\x12\x10\n\x08\x66ield_id\x18\x05 \x01(\x03\x12\x12\n\nis_dynamic\x18\x06 \x01(\x08\x42\x07\n\x05\x66ield\"w\n\x03IDs\x12\x30\n\x06int_id\x18\x01 \x01(\x0b\x32\x1e.milvus.proto.schema.LongArrayH\x00\x12\x32\n\x06str_id\x18\x02 \x01(\x0b\x32 .milvus.proto.schema.StringArrayH\x00\x42\n\n\x08id_field\"\x86\x02\n\x10SearchResultData\x12\x13\n\x0bnum_queries\x18\x01 \x01(\x03\x12\r\n\x05top_k\x18\x02 \x01(\x03\x12\x33\n\x0b\x66ields_data\x18\x03 \x03(\x0b\x32\x1e.milvus.proto.schema.FieldData\x12\x0e\n\x06scores\x18\x04 \x03(\x02\x12%\n\x03ids\x18\x05 \x01(\x0b\x32\x18.milvus.proto.schema.IDs\x12\r\n\x05topks\x18\x06 \x03(\x03\x12\x15\n\routput_fields\x18\x07 \x03(\t\x12<\n\x14group_by_field_value\x18\x08 \x01(\x0b\x32\x1e.milvus.proto.schema.FieldData\"Y\n\x14VectorClusteringInfo\x12\r\n\x05\x66ield\x18\x01 \x01(\t\x12\x32\n\x08\x63\x65ntroid\x18\x02 \x01(\x0b\x32 .milvus.proto.schema.VectorField\"%\n\x14ScalarClusteringInfo\x12\r\n\x05\x66ield\x18\x01 \x01(\t\"\xa8\x01\n\x0e\x43lusteringInfo\x12J\n\x17vector_clustering_infos\x18\x01 \x03(\x0b\x32).milvus.proto.schema.VectorClusteringInfo\x12J\n\x17scalar_clustering_infos\x18\x02 \x03(\x0b\x32).milvus.proto.schema.ScalarClusteringInfo*\xd8\x01\n\x08\x44\x61taType\x12\x08\n\x04None\x10\x00\x12\x08\n\x04\x42ool\x10\x01\x12\x08\n\x04Int8\x10\x02\x12\t\n\x05Int16\x10\x03\x12\t\n\x05Int32\x10\x04\x12\t\n\x05Int64\x10\x05\x12\t\n\x05\x46loat\x10\n\x12\n\n\x06\x44ouble\x10\x0b\x12\n\n\x06String\x10\x14\x12\x0b\n\x07VarChar\x10\x15\x12\t\n\x05\x41rray\x10\x16\x12\x08\n\x04JSON\x10\x17\x12\x10\n\x0c\x42inaryVector\x10\x64\x12\x0f\n\x0b\x46loatVector\x10\x65\x12\x11\n\rFloat16Vector\x10\x66\x12\x12\n\x0e\x42\x46loat16Vector\x10g*V\n\nFieldState\x12\x10\n\x0c\x46ieldCreated\x10\x00\x12\x11\n\rFieldCreating\x10\x01\x12\x11\n\rFieldDropping\x10\x02\x12\x10\n\x0c\x46ieldDropped\x10\x03\x42m\n\x0eio.milvus.grpcB\x0bSchemaProtoP\x01Z4github.com/milvus-io/milvus-proto/go-api/v2/schemapb\xa0\x01\x01\xaa\x02\x12Milvus.Client.Grpcb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -26,48 +26,48 @@ _globals['DESCRIPTOR']._serialized_options = b'\n\016io.milvus.grpcB\013SchemaProtoP\001Z4github.com/milvus-io/milvus-proto/go-api/v2/schemapb\240\001\001\252\002\022Milvus.Client.Grpc' _globals['_COLLECTIONSCHEMA'].fields_by_name['autoID']._options = None _globals['_COLLECTIONSCHEMA'].fields_by_name['autoID']._serialized_options = b'\030\001' - _globals['_DATATYPE']._serialized_start=2831 - _globals['_DATATYPE']._serialized_end=3047 - _globals['_FIELDSTATE']._serialized_start=3049 - _globals['_FIELDSTATE']._serialized_end=3135 + _globals['_DATATYPE']._serialized_start=2858 + _globals['_DATATYPE']._serialized_end=3074 + _globals['_FIELDSTATE']._serialized_start=3076 + _globals['_FIELDSTATE']._serialized_end=3162 _globals['_FIELDSCHEMA']._serialized_start=86 - _globals['_FIELDSCHEMA']._serialized_end=557 - _globals['_COLLECTIONSCHEMA']._serialized_start=560 - _globals['_COLLECTIONSCHEMA']._serialized_end=713 - _globals['_BOOLARRAY']._serialized_start=715 - _globals['_BOOLARRAY']._serialized_end=740 - _globals['_INTARRAY']._serialized_start=742 - _globals['_INTARRAY']._serialized_end=766 - _globals['_LONGARRAY']._serialized_start=768 - _globals['_LONGARRAY']._serialized_end=793 - _globals['_FLOATARRAY']._serialized_start=795 - _globals['_FLOATARRAY']._serialized_end=821 - _globals['_DOUBLEARRAY']._serialized_start=823 - _globals['_DOUBLEARRAY']._serialized_end=850 - _globals['_BYTESARRAY']._serialized_start=852 - _globals['_BYTESARRAY']._serialized_end=878 - _globals['_STRINGARRAY']._serialized_start=880 - _globals['_STRINGARRAY']._serialized_end=907 - _globals['_ARRAYARRAY']._serialized_start=909 - _globals['_ARRAYARRAY']._serialized_end=1022 - _globals['_JSONARRAY']._serialized_start=1024 - _globals['_JSONARRAY']._serialized_end=1049 - _globals['_VALUEFIELD']._serialized_start=1052 - _globals['_VALUEFIELD']._serialized_end=1224 - _globals['_SCALARFIELD']._serialized_start=1227 - _globals['_SCALARFIELD']._serialized_end=1737 - _globals['_VECTORFIELD']._serialized_start=1740 - _globals['_VECTORFIELD']._serialized_end=1909 - _globals['_FIELDDATA']._serialized_start=1912 - _globals['_FIELDDATA']._serialized_end=2141 - _globals['_IDS']._serialized_start=2143 - _globals['_IDS']._serialized_end=2262 - _globals['_SEARCHRESULTDATA']._serialized_start=2265 - _globals['_SEARCHRESULTDATA']._serialized_end=2527 - _globals['_VECTORCLUSTERINGINFO']._serialized_start=2529 - _globals['_VECTORCLUSTERINGINFO']._serialized_end=2618 - _globals['_SCALARCLUSTERINGINFO']._serialized_start=2620 - _globals['_SCALARCLUSTERINGINFO']._serialized_end=2657 - _globals['_CLUSTERINGINFO']._serialized_start=2660 - _globals['_CLUSTERINGINFO']._serialized_end=2828 + _globals['_FIELDSCHEMA']._serialized_end=584 + _globals['_COLLECTIONSCHEMA']._serialized_start=587 + _globals['_COLLECTIONSCHEMA']._serialized_end=740 + _globals['_BOOLARRAY']._serialized_start=742 + _globals['_BOOLARRAY']._serialized_end=767 + _globals['_INTARRAY']._serialized_start=769 + _globals['_INTARRAY']._serialized_end=793 + _globals['_LONGARRAY']._serialized_start=795 + _globals['_LONGARRAY']._serialized_end=820 + _globals['_FLOATARRAY']._serialized_start=822 + _globals['_FLOATARRAY']._serialized_end=848 + _globals['_DOUBLEARRAY']._serialized_start=850 + _globals['_DOUBLEARRAY']._serialized_end=877 + _globals['_BYTESARRAY']._serialized_start=879 + _globals['_BYTESARRAY']._serialized_end=905 + _globals['_STRINGARRAY']._serialized_start=907 + _globals['_STRINGARRAY']._serialized_end=934 + _globals['_ARRAYARRAY']._serialized_start=936 + _globals['_ARRAYARRAY']._serialized_end=1049 + _globals['_JSONARRAY']._serialized_start=1051 + _globals['_JSONARRAY']._serialized_end=1076 + _globals['_VALUEFIELD']._serialized_start=1079 + _globals['_VALUEFIELD']._serialized_end=1251 + _globals['_SCALARFIELD']._serialized_start=1254 + _globals['_SCALARFIELD']._serialized_end=1764 + _globals['_VECTORFIELD']._serialized_start=1767 + _globals['_VECTORFIELD']._serialized_end=1936 + _globals['_FIELDDATA']._serialized_start=1939 + _globals['_FIELDDATA']._serialized_end=2168 + _globals['_IDS']._serialized_start=2170 + _globals['_IDS']._serialized_end=2289 + _globals['_SEARCHRESULTDATA']._serialized_start=2292 + _globals['_SEARCHRESULTDATA']._serialized_end=2554 + _globals['_VECTORCLUSTERINGINFO']._serialized_start=2556 + _globals['_VECTORCLUSTERINGINFO']._serialized_end=2645 + _globals['_SCALARCLUSTERINGINFO']._serialized_start=2647 + _globals['_SCALARCLUSTERINGINFO']._serialized_end=2684 + _globals['_CLUSTERINGINFO']._serialized_start=2687 + _globals['_CLUSTERINGINFO']._serialized_end=2855 # @@protoc_insertion_point(module_scope) diff --git a/pymilvus/grpc_gen/schema_pb2.pyi b/pymilvus/grpc_gen/schema_pb2.pyi index 650ebfdf7..f39d7e59e 100644 --- a/pymilvus/grpc_gen/schema_pb2.pyi +++ b/pymilvus/grpc_gen/schema_pb2.pyi @@ -55,7 +55,7 @@ FieldDropping: FieldState FieldDropped: FieldState class FieldSchema(_message.Message): - __slots__ = ("fieldID", "name", "is_primary_key", "description", "data_type", "type_params", "index_params", "autoID", "state", "element_type", "default_value", "is_dynamic", "is_partition_key") + __slots__ = ("fieldID", "name", "is_primary_key", "description", "data_type", "type_params", "index_params", "autoID", "state", "element_type", "default_value", "is_dynamic", "is_partition_key", "is_clustering_key") FIELDID_FIELD_NUMBER: _ClassVar[int] NAME_FIELD_NUMBER: _ClassVar[int] IS_PRIMARY_KEY_FIELD_NUMBER: _ClassVar[int] @@ -69,6 +69,7 @@ class FieldSchema(_message.Message): DEFAULT_VALUE_FIELD_NUMBER: _ClassVar[int] IS_DYNAMIC_FIELD_NUMBER: _ClassVar[int] IS_PARTITION_KEY_FIELD_NUMBER: _ClassVar[int] + IS_CLUSTERING_KEY_FIELD_NUMBER: _ClassVar[int] fieldID: int name: str is_primary_key: bool @@ -82,7 +83,8 @@ class FieldSchema(_message.Message): default_value: ValueField is_dynamic: bool is_partition_key: bool - def __init__(self, fieldID: _Optional[int] = ..., name: _Optional[str] = ..., is_primary_key: bool = ..., description: _Optional[str] = ..., data_type: _Optional[_Union[DataType, str]] = ..., type_params: _Optional[_Iterable[_Union[_common_pb2.KeyValuePair, _Mapping]]] = ..., index_params: _Optional[_Iterable[_Union[_common_pb2.KeyValuePair, _Mapping]]] = ..., autoID: bool = ..., state: _Optional[_Union[FieldState, str]] = ..., element_type: _Optional[_Union[DataType, str]] = ..., default_value: _Optional[_Union[ValueField, _Mapping]] = ..., is_dynamic: bool = ..., is_partition_key: bool = ...) -> None: ... + is_clustering_key: bool + def __init__(self, fieldID: _Optional[int] = ..., name: _Optional[str] = ..., is_primary_key: bool = ..., description: _Optional[str] = ..., data_type: _Optional[_Union[DataType, str]] = ..., type_params: _Optional[_Iterable[_Union[_common_pb2.KeyValuePair, _Mapping]]] = ..., index_params: _Optional[_Iterable[_Union[_common_pb2.KeyValuePair, _Mapping]]] = ..., autoID: bool = ..., state: _Optional[_Union[FieldState, str]] = ..., element_type: _Optional[_Union[DataType, str]] = ..., default_value: _Optional[_Union[ValueField, _Mapping]] = ..., is_dynamic: bool = ..., is_partition_key: bool = ..., is_clustering_key: bool = ...) -> None: ... class CollectionSchema(_message.Message): __slots__ = ("name", "description", "autoID", "fields", "enable_dynamic_field") diff --git a/pymilvus/milvus_client/index.py b/pymilvus/milvus_client/index.py new file mode 100644 index 000000000..1eb2c5a5e --- /dev/null +++ b/pymilvus/milvus_client/index.py @@ -0,0 +1,51 @@ +class IndexParam: + def __init__(self, field_name: str, index_type: str, index_name: str, **kwargs): + self._field_name = field_name + self._index_name = index_name + self._index_type = index_type + self._kwargs = kwargs + + @property + def field_name(self): + return self._field_name + + @property + def index_name(self): + return self._index_name + + @property + def index_type(self): + return self._index_type + + def __iter__(self): + yield "field_name", self._field_name + yield "index_name", self._index_name + yield from self._kwargs.items() + + def __str__(self): + return str(dict(self)) + + def __eq__(self, other: None): + if isinstance(other, self.__class__): + return dict(self) == dict(other) + + if isinstance(other, dict): + return dict(self) == other + return False + + +class IndexParams: + def __init__(self): + self._indexes = {} + + def add_index(self, field_name: str, index_type: str = "", index_name: str = "", **kwargs): + index_param = IndexParam(field_name, index_type, index_name, **kwargs) + pair_key = (field_name, index_name) + self._indexes[pair_key] = index_param + + def __iter__(self): + for v in self._indexes.values(): + yield dict(v) + + def __str__(self): + return str(list(self)) diff --git a/pymilvus/milvus_client/milvus_client.py b/pymilvus/milvus_client/milvus_client.py index e6225672d..a8b668862 100644 --- a/pymilvus/milvus_client/milvus_client.py +++ b/pymilvus/milvus_client/milvus_client.py @@ -1,10 +1,11 @@ """MilvusClient for dealing with simple workflows.""" + import logging from typing import Dict, List, Optional, Union from uuid import uuid4 from pymilvus.client.constants import DEFAULT_CONSISTENCY_LEVEL -from pymilvus.client.types import ExceptionsMessage +from pymilvus.client.types import ExceptionsMessage, LoadState from pymilvus.exceptions import ( DataTypeNotMatchException, MilvusException, @@ -16,6 +17,8 @@ from pymilvus.orm.connections import connections from pymilvus.orm.types import DataType +from .index import IndexParams + logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -62,6 +65,37 @@ def __init__( self.is_self_hosted = bool(utility.get_server_type(using=self._using) == "milvus") def create_collection( + self, + collection_name: str, + dimension: Optional[int] = None, + primary_field_name: str = "id", # default is "id" + id_type: str = "int", # or "string", + vector_field_name: str = "vector", # default is "vector" + metric_type: str = "IP", + auto_id: bool = False, + timeout: Optional[float] = None, + schema: Optional[CollectionSchema] = None, + index_params: Optional[IndexParams] = None, + **kwargs, + ): + if schema is None: + return self._fast_create_collection( + collection_name, + dimension, + primary_field_name=primary_field_name, + id_type=id_type, + vector_field_name=vector_field_name, + metric_type=metric_type, + auto_id=auto_id, + timeout=timeout, + **kwargs, + ) + + return self._create_collection_with_schema( + collection_name, schema, index_params, timeout=timeout, **kwargs + ) + + def _fast_create_collection( self, collection_name: str, dimension: int, @@ -103,21 +137,43 @@ def create_collection( except Exception as ex: logger.error("Failed to create collection: %s", collection_name) raise ex from ex - index_params = {"metric_type": metric_type, "params": {}} - self._create_index(collection_name, vector_field_name, index_params, timeout=timeout) - self._load(collection_name, timeout=timeout) + + index_params = self.prepare_index_params() + index_type = "" + index_name = "" + index_params.add_index( + vector_field_name, index_type, index_name, metric_type=metric_type, params={} + ) + self.create_index(collection_name, index_params, timeout=timeout) + self.load_collection(collection_name, timeout=timeout) + + def create_index( + self, + collection_name: str, + index_params: IndexParams, + timeout: Optional[float] = None, + **kwargs, + ): + for index_param in index_params: + self._create_index(collection_name, index_param, timeout=timeout, **kwargs) def _create_index( self, collection_name: str, - vec_field_name: str, - index_params: Dict, + index_param: Dict, timeout: Optional[float] = None, - ) -> None: - """Create a index on the collection""" + **kwargs, + ): conn = self._get_connection() try: - conn.create_index(collection_name, vec_field_name, index_params, timeout=timeout) + conn.create_index( + collection_name, + index_param["field_name"], + index_param.get("params", {}), + index_name=index_param.get("index_name", ""), + timeout=timeout, + **kwargs, + ) logger.debug("Successfully created an index on collection: %s", collection_name) except Exception as ex: logger.error("Failed to create an index on collection: %s", collection_name) @@ -127,11 +183,10 @@ def insert( self, collection_name: str, data: Union[Dict, List[Dict]], - batch_size: int = 0, - progress_bar: bool = False, timeout: Optional[float] = None, + partition_name: Optional[str] = "", **kwargs, - ) -> List[Union[str, int]]: + ) -> Dict: """Insert data into the collection. If the Milvus Client was initiated without an existing Collection, the first dict passed @@ -142,49 +197,74 @@ def insert( cast to list. timeout (float, optional): The timeout to use, will override init timeout. Defaults to None. - batch_size (int, optional): The batch size to perform inputs with. Defaults to 0, - which means not batch the input. - progress_bar (bool, optional): Whether to display a progress bar for the input. - Defaults to False. Raises: DataNotMatchException: If the data has missing fields an exception will be thrown. MilvusException: General Milvus error on insert. Returns: - List[Union[str, int]]: A list of primary keys that were inserted. + Dict: Number of rows that were inserted. """ # If no data provided, we cannot input anything if isinstance(data, Dict): data = [data] if len(data) == 0: - return [] + return {"insert_count": 0} + + conn = self._get_connection() + # Insert into the collection. + try: + res = conn.insert_rows( + collection_name, data, partition_name=partition_name, timeout=timeout + ) + except Exception as ex: + raise ex from ex + return {"insert_count": res.insert_count} - if batch_size < 0: - logger.error("Invalid batch size provided for insert.") - msg = "Invalid batch size provided for insert." - raise ValueError(msg) + def upsert( + self, + collection_name: str, + data: Union[Dict, List[Dict]], + timeout: Optional[float] = None, + partition_name: Optional[str] = "", + **kwargs, + ) -> Dict: + """Insert data into the collection. + + If the Milvus Client was initiated without an existing Collection, the first dict passed + in will be used to initiate the collection. + + Args: + data (List[Dict[str, any]]): A list of dicts to pass in. If list not provided, will + cast to list. + timeout (float, optional): The timeout to use, will override init timeout. Defaults + to None. + + Raises: + DataNotMatchException: If the data has missing fields an exception will be thrown. + MilvusException: General Milvus error on insert. + + Returns: + List[Union[str, int]]: A list of primary keys that were inserted. + """ + # If no data provided, we cannot input anything + if isinstance(data, Dict): + data = [data] - if batch_size == 0: - batch_size = len(data) + if len(data) == 0: + return {"upsert_count": 0} conn = self._get_connection() - pks = [] - for i in self.tqdm(range(0, len(data), batch_size), disable=not progress_bar): - # Convert dict to list of lists batch for insertion - insert_batch = data[i : i + batch_size] - # Insert into the collection. - try: - res = conn.insert_rows(collection_name, insert_batch, timeout=timeout) - pks.extend(res.primary_keys) - except Exception as ex: - logger.error( - "Failed to insert batch starting at entity: %s/%s", str(i), str(len(data)) - ) - raise ex from ex + # Upsert into the collection. + try: + res = conn.upsert_rows( + collection_name, data, partition_name=partition_name, timeout=timeout, **kwargs + ) + except Exception as ex: + raise ex from ex - return pks + return {"upsert_count": res.upsert_count} def search( self, @@ -195,8 +275,9 @@ def search( output_fields: Optional[List[str]] = None, search_params: Optional[dict] = None, timeout: Optional[float] = None, + partition_names: Optional[List[str]] = None, **kwargs, - ) -> List[dict]: + ) -> List[List[dict]]: """Search for a query vector/vectors. In order for the search to process, a collection needs to have been either provided @@ -216,7 +297,7 @@ def search( ValueError: The collection being searched doesnt exist. Need to insert data first. Returns: - List[dict]: A list of dicts containing the score and the result data. Embeddings are + List[List[dict]]: A nested list of dicts containing the result data. Embeddings are not included in the result data. """ @@ -230,6 +311,7 @@ def search( expression=filter, limit=limit, output_fields=output_fields, + partition_names=partition_names, timeout=timeout, **kwargs, ) @@ -249,15 +331,17 @@ def search( def query( self, collection_name: str, - filter: str, + filter: str = "", output_fields: Optional[List[str]] = None, timeout: Optional[float] = None, + ids: Optional[Union[List, str, int]] = None, + partition_names: Optional[List[str]] = None, **kwargs, ) -> List[dict]: """Query for entries in the Collection. Args: - filter_expression (str): The filter to use for the query. + filter (str): The filter to use for the query. return_fields (List[str], optional): List of which field values to return. If None specified, all fields excluding vector field will be returned. partitions (List[str], optional): Which partitions to perform query. Defaults to None. @@ -270,9 +354,15 @@ def query( Returns: List[dict]: A list of result dicts, vectors are not included. """ - if not isinstance(filter, str): + if filter and not isinstance(filter, str): raise DataTypeNotMatchException(message=ExceptionsMessage.ExprType % type(filter)) + if filter and ids is not None: + raise ParamError(message=ExceptionsMessage.AmbiguousDeleteFilterParam) + + if isinstance(ids, (int, str)): + ids = [ids] + conn = self._get_connection() try: schema_dict = conn.describe_collection(collection_name, timeout=timeout, **kwargs) @@ -280,6 +370,9 @@ def query( logger.error("Failed to describe collection: %s", collection_name) raise ex from ex + if ids: + filter = self._pack_pks_expr(schema_dict, ids) + if not output_fields: output_fields = ["*"] vec_field_name = self._get_vector_field_name(schema_dict) @@ -288,7 +381,12 @@ def query( try: res = conn.query( - collection_name, expr=filter, output_fields=output_fields, timeout=timeout, **kwargs + collection_name, + expr=filter, + output_fields=output_fields, + partition_names=partition_names, + timeout=timeout, + **kwargs, ) except Exception as ex: logger.error("Failed to query collection: %s", collection_name) @@ -302,6 +400,7 @@ def get( ids: Union[list, str, int], output_fields: Optional[List[str]] = None, timeout: Optional[float] = None, + partition_names: Optional[List[str]] = None, **kwargs, ) -> List[dict]: """Grab the inserted vectors using the primary key from the Collection. @@ -342,7 +441,12 @@ def get( expr = self._pack_pks_expr(schema_dict, ids) try: res = conn.query( - collection_name, expr=expr, output_fields=output_fields, timeout=timeout, **kwargs + collection_name, + expr=expr, + output_fields=output_fields, + partition_names=partition_names, + timeout=timeout, + **kwargs, ) except Exception as ex: logger.error("Failed to get collection: %s", collection_name) @@ -353,11 +457,12 @@ def get( def delete( self, collection_name: str, - pks: Optional[Union[list, str, int]] = None, + ids: Optional[Union[list, str, int]] = None, timeout: Optional[float] = None, filter: Optional[str] = "", + partition_name: Optional[str] = "", **kwargs, - ): + ) -> Dict: """Delete entries in the collection by their pk. Delete all the entries based on the pk. If unsure of pk you can first query the collection @@ -373,16 +478,25 @@ def delete( Milvus(previous 2.3.2) is not empty, the list of primary keys is still returned. Args: - pks (list, str, int): The pk's to delete. Depending on pk_field type it can be int + ids (list, str, int): The pk's to delete. Depending on pk_field type it can be int or str or alist of either. Default to None. filter(str, optional): A filter to use for the deletion. Defaults to empty. timeout (int, optional): Timeout to use, overides the client level assigned at init. Defaults to None. - """ + Returns: + Dict: Number of rows that were deleted. + """ + pks = kwargs.get("pks", []) if isinstance(pks, (int, str)): pks = [pks] + if ids: + if isinstance(ids, (int, str)): + pks.append(ids) + elif isinstance(ids, list): + pks.extend(ids) + expr = "" conn = self._get_connection() if pks: @@ -405,7 +519,7 @@ def delete( ret_pks = [] try: - res = conn.delete(collection_name, expr, timeout=timeout, **kwargs) + res = conn.delete(collection_name, expr, partition_name, timeout=timeout, **kwargs) if res.primary_keys: ret_pks.extend(res.primary_keys) except Exception as ex: @@ -414,54 +528,43 @@ def delete( if ret_pks: return ret_pks - return None - def num_entities(self, collection_name: str, timeout: Optional[float] = None) -> int: - """return the number of rows in the collection. + return {"delete_count": res.delete_count} - Returns: - int: Number for rows. - """ + def get_collection_stats(self, collection_name: str, timeout: Optional[float] = None) -> Dict: conn = self._get_connection() stats = conn.get_collection_stats(collection_name, timeout=timeout) result = {stat.key: stat.value for stat in stats} result["row_count"] = int(result["row_count"]) - return result["row_count"] + return result - def flush(self, collection_name: str, timeout: Optional[float] = None, **kwargs): - """Seal all segments in the collection. Inserts after flushing will be written into - new segments. Only sealed segments can be indexed. - - Args: - timeout (float): an optional duration of time in seconds to allow for the RPCs. - If timeout is not set, the client keeps waiting until the server responds - or an error occurs. - """ + def describe_collection(self, collection_name: str, timeout: Optional[float] = None, **kwargs): conn = self._get_connection() - conn.flush([collection_name], timeout=timeout, **kwargs) + return conn.describe_collection(collection_name, timeout=timeout, **kwargs) - def describe_collection(self, collection_name: str, **kwargs): + def has_collection(self, collection_name: str, timeout: Optional[float] = None, **kwargs): conn = self._get_connection() - try: - schema_dict = conn.describe_collection(collection_name, **kwargs) - except Exception as ex: - logger.error("Failed to describe collection: %s", collection_name) - raise ex from ex - return schema_dict + return conn.has_collection(collection_name, timeout=timeout, **kwargs) def list_collections(self, **kwargs): conn = self._get_connection() - try: - collection_names = conn.list_collections(**kwargs) - except Exception as ex: - logger.error("Failed to list collections") - raise ex from ex - return collection_names + return conn.list_collections(**kwargs) - def drop_collection(self, collection_name: str): + def drop_collection(self, collection_name: str, timeout: Optional[float] = None, **kwargs): """Delete the collection stored in this object""" conn = self._get_connection() - conn.drop_collection(collection_name) + conn.drop_collection(collection_name, timeout=timeout, **kwargs) + + def rename_collection( + self, + old_name: str, + new_name: str, + target_db: Optional[str] = "", + timeout: Optional[float] = None, + **kwargs, + ): + conn = self._get_connection() + conn.rename_collections(old_name, new_name, target_db, timeout=timeout, **kwargs) @classmethod def create_schema(cls, **kwargs): @@ -469,34 +572,14 @@ def create_schema(cls, **kwargs): return CollectionSchema([], **kwargs) @classmethod - def prepare_index_params( - cls, - field_name: str, - index_type: Optional[str] = None, - metric_type: Optional[str] = None, - index_name: str = "", - params: Optional[Dict] = None, - **kwargs, - ): - index_params = {"field_name": field_name} - if index_type is not None: - index_params["index_type"] = index_type - if metric_type: - index_params["metric_type"] = metric_type - if index_name: - index_params["index_name"] = index_name - - index_params["params"] = params or {} + def prepare_index_params(cls): + return IndexParams() - index_params.update(**kwargs) - - return index_params - - def create_collection_with_schema( + def _create_collection_with_schema( self, collection_name: str, schema: CollectionSchema, - index_params: Dict, + index_params: IndexParams, timeout: Optional[float] = None, **kwargs, ): @@ -507,12 +590,6 @@ def create_collection_with_schema( schema.enable_dynamic_field = True schema.verify() - index_params = index_params or {} - vector_field_name = index_params.pop("field_name", "") - if not vector_field_name: - schema_dict = schema.to_dict() - vector_field_name = self._get_vector_field_name(schema_dict) - conn = self._get_connection() if "consistency_level" not in kwargs: kwargs["consistency_level"] = DEFAULT_CONSISTENCY_LEVEL @@ -523,8 +600,9 @@ def create_collection_with_schema( logger.error("Failed to create collection: %s", collection_name) raise ex from ex - self._create_index(collection_name, vector_field_name, index_params, timeout=timeout) - self._load(collection_name, timeout=timeout) + if index_params: + self.create_index(collection_name, index_params, timeout=timeout) + self.load_collection(collection_name, timeout=timeout) def close(self): connections.disconnect(self._using) @@ -588,11 +666,301 @@ def _pack_pks_expr(self, schema_dict: Dict, pks: List) -> str: expr = f"{pk_field_name} in [{','.join(ids)}]" return expr - def _load(self, collection_name: str, timeout: Optional[float] = None): + def load_collection(self, collection_name: str, timeout: Optional[float] = None, **kwargs): """Loads the collection.""" conn = self._get_connection() try: - conn.load_collection(collection_name, timeout=timeout) + conn.load_collection(collection_name, timeout=timeout, **kwargs) + except MilvusException as ex: + logger.error( + "Failed to load collection: %s", + collection_name, + ) + raise ex from ex + + def release_collection(self, collection_name: str, timeout: Optional[float] = None, **kwargs): + conn = self._get_connection() + try: + conn.release_collection(collection_name, timeout=timeout, **kwargs) except MilvusException as ex: - logger.error("Failed to load collection: %s", collection_name) + logger.error( + "Failed to load collection: %s", + collection_name, + ) + raise ex from ex + + def get_load_state( + self, + collection_name: str, + partition_name: Optional[str] = "", + timeout: Optional[float] = None, + **kwargs, + ) -> Dict: + conn = self._get_connection() + partition_names = None + if partition_name: + partition_names = [partition_name] + try: + state = conn.get_load_state(collection_name, partition_names, tiemout=timeout, **kwargs) + except Exception as ex: + raise ex from ex + + ret = {"state": state} + if state == LoadState.Loading: + progress = conn.get_loading_progress(collection_name, partition_names, timeout=timeout) + ret["progress"] = progress + + return ret + + def refresh_load(self, collection_name: str, timeout: Optional[float] = None, **kwargs): + kwargs.pop("_refresh", None) + conn = self._get_connection() + conn.load_collection(collection_name, timeout=timeout, _refresh=True, **kwargs) + + def list_indexes(self, collection_name: str, field_name: Optional[str] = "", **kwargs): + """List all indexes of collection. If `field_name` is not specified, + return all the indexes of this collection, otherwise this interface will return + all indexes on this field of the collection. + + :param collection_name: The name of collection. + :type collection_name: str + + :param field_name: The name of field. If no field name is specified, all indexes + of this collection will be returned. + + :return: The name list of all indexes. + :rtype: str list + """ + conn = self._get_connection() + indexes = conn.list_indexes(collection_name, **kwargs) + index_name_list = [] + for index in indexes: + if not index: + continue + if not field_name or index.field_name == field_name: + index_name_list.append(index.index_name) + return index_name_list + + def drop_index( + self, + collection_name: str, + index_name: str, + timeout: Optional[float] = None, + **kwargs, + ): + conn = self._get_connection() + conn.drop_index(collection_name, "", index_name, timeout=timeout, **kwargs) + + def describe_index( + self, + collection_name: str, + index_name: str, + timeout: Optional[float] = None, + **kwargs, + ) -> Dict: + conn = self._get_connection() + return conn.describe_index(collection_name, index_name, timeout=timeout, **kwargs) + + def create_partition( + self, + collection_name: str, + partition_name: str, + timeout: Optional[float] = None, + **kwargs, + ): + conn = self._get_connection() + conn.create_partition(collection_name, partition_name, timeout=timeout, **kwargs) + + def drop_partition( + self, + collection_name: str, + partition_name: str, + timeout: Optional[float] = None, + **kwargs, + ): + conn = self._get_connection() + conn.drop_partition(collection_name, partition_name, timeout=timeout, **kwargs) + + def has_partition( + self, + collection_name: str, + partition_name: str, + timeout: Optional[float] = None, + **kwargs, + ) -> bool: + conn = self._get_connection() + return conn.has_partition(collection_name, partition_name, timeout=timeout, **kwargs) + + def list_partitions( + self, collection_name: str, timeout: Optional[float] = None, **kwargs + ) -> List[str]: + conn = self._get_connection() + return conn.list_partitions(collection_name, timeout=timeout, **kwargs) + + def load_partitions( + self, + collection_name: str, + partition_names: Union[str, List[str]], + timeout: Optional[float] = None, + **kwargs, + ): + if isinstance(partition_names, str): + partition_names = [partition_names] + + conn = self._get_connection() + conn.load_partitions(collection_name, partition_names, timeout=timeout, **kwargs) + + def release_partitions( + self, + collection_name: str, + partition_names: Union[str, List[str]], + timeout: Optional[float] = None, + **kwargs, + ): + + if isinstance(partition_names, str): + partition_names = [partition_names] + conn = self._get_connection() + conn.release_partitions(collection_name, partition_names, timeout=timeout, **kwargs) + + def get_partition_stats( + self, collection_name: str, partition_name: str, timeout: Optional[float] = None, **kwargs + ) -> Dict: + conn = self._get_connection() + return conn.get_partition_stats(collection_name, partition_name, timeout=timeout, **kwargs) + + def create_user(self, user_name: str, password: str, timeout: Optional[float] = None, **kwargs): + conn = self._get_connection() + return conn.create_user(user_name, password, timeout=timeout, **kwargs) + + def drop_user(self, user_name: str, timeout: Optional[float] = None, **kwargs): + conn = self._get_connection() + return conn.delete_user(user_name, timeout=timeout, **kwargs) + + def update_password( + self, + user_name: str, + old_password: str, + new_password: str, + reset_connection: Optional[bool] = False, + timeout: Optional[float] = None, + **kwargs, + ): + conn = self._get_connection() + conn.update_password(user_name, old_password, new_password, timeout=timeout, **kwargs) + if reset_connection: + conn._setup_authorization_interceptor(user_name, new_password, None) + conn._setup_grpc_channel() + + def list_users(self, timeout: Optional[float] = None, **kwargs): + conn = self._get_connection() + return conn.list_usernames(timeout=timeout, **kwargs) + + def describe_user(self, user_name: str, timeout: Optional[float] = None, **kwargs): + conn = self._get_connection() + try: + res = conn.select_one_user(user_name, True, timeout=timeout, **kwargs) + except Exception as ex: + raise ex from ex + if res.groups: + item = res.groups[0] + return { + "user_name": user_name, + "roles": item.roles, + } + return {} + + def grant_role(self, user_name: str, role_name: str, timeout: Optional[float] = None, **kwargs): + conn = self._get_connection() + conn.add_user_to_role(user_name, role_name, timeout=timeout, **kwargs) + + def revoke_role( + self, user_name: str, role_name: str, timeout: Optional[float] = None, **kwargs + ): + conn = self._get_connection() + conn.remove_user_from_role(user_name, role_name, timeout=timeout, **kwargs) + + def create_role(self, role_name: str, timeout: Optional[float] = None, **kwargs): + conn = self._get_connection() + conn.create_role(role_name, timeout=timeout, **kwargs) + + def drop_role(self, role_name: str, timeout: Optional[float] = None, **kwargs): + conn = self._get_connection() + conn.drop_role(role_name, timeout=timeout, **kwargs) + + def describe_role( + self, role_name: str, timeout: Optional[float] = None, **kwargs + ) -> List[Dict]: + conn = self._get_connection() + db_name = kwargs.pop("db_name", "") + try: + res = conn.select_grant_for_one_role(role_name, db_name, timeout=timeout, **kwargs) + except Exception as ex: raise ex from ex + return [dict(i) for i in res.groups] + + def list_roles(self, timeout: Optional[float] = None, **kwargs): + conn = self._get_connection() + try: + res = conn.select_all_role(False, timeout=timeout, **kwargs) + except Exception as ex: + raise ex from ex + + groups = res.groups + return [g.role_name for g in groups] + + def grant_privilege( + self, + role_name: str, + object_type: str, + privilege: str, + object_name: str, + db_name: Optional[str] = "", + timeout: Optional[float] = None, + **kwargs, + ): + conn = self._get_connection() + conn.grant_privilege( + role_name, object_type, object_name, privilege, db_name, timeout=timeout, **kwargs + ) + + def revoke_privilege( + self, + role_name: str, + object_type: str, + privilege: str, + object_name: str, + db_name: Optional[str] = "", + timeout: Optional[float] = None, + **kwargs, + ): + conn = self._get_connection() + conn.revoke_privilege( + role_name, object_type, object_name, privilege, db_name, timeout=timeout, **kwargs + ) + + def create_alias( + self, collection_name: str, alias: str, timeout: Optional[float] = None, **kwargs + ): + conn = self._get_connection() + conn.create_alias(collection_name, alias, tiemout=timeout, **kwargs) + + def drop_alias(self, alias: str, timeout: Optional[float] = None, **kwargs): + conn = self._get_connection() + conn.drop_alias(alias, tiemout=timeout, **kwargs) + + def alter_alias( + self, collection_name: str, alias: str, timeout: Optional[float] = None, **kwargs + ): + conn = self._get_connection() + conn.alter_alias(collection_name, alias, tiemout=timeout, **kwargs) + + def describe_alias(self, alias: str, timeout: Optional[float] = None, **kwargs) -> Dict: + pass + + def list_aliases(self, timeout: Optional[float] = None, **kwargs) -> List[str]: + pass + + def using_database(self, db_name: str, **kwargs): + conn = self._get_connection() + conn.reset_db_name(db_name)