From 4ba030e74c4f0987cb53586806fb74b7e14558ed Mon Sep 17 00:00:00 2001 From: wei liu Date: Wed, 25 Dec 2024 17:16:42 +0800 Subject: [PATCH] enhance: Enable resource group api in milvus client (#2513) issue: milvus-io/milvus#38739 this PR enable those API in milvus client: - create_resource_group - update_resource_groups - drop_resource_group - describe_resource_group - list_resource_groups - transfer_replica cause the resource group prefer to use declarative api, so the transfer_node API is deprecated in milvus client, it's recommand to use update_resource_group to manage node num in resource group. Signed-off-by: Wei Liu --- examples/resource_group.py | 105 ++++++++++++++++++++++++ pymilvus/milvus_client/milvus_client.py | 100 ++++++++++++++++++++++ 2 files changed, 205 insertions(+) create mode 100644 examples/resource_group.py diff --git a/examples/resource_group.py b/examples/resource_group.py new file mode 100644 index 000000000..828a7357a --- /dev/null +++ b/examples/resource_group.py @@ -0,0 +1,105 @@ +from pymilvus import ( + MilvusClient, + DataType, +) +from pymilvus.client.constants import DEFAULT_RESOURCE_GROUP + +from pymilvus.client.types import ( + ResourceGroupConfig, +) + +fmt = "\n=== {:30} ===\n" +dim = 8 +collection_name = "hello_milvus" +milvus_client = MilvusClient("http://localhost:19530") + + +## create collection and load collection +print("create collection and load collection") +collection_name = "hello_milvus" +has_collection = milvus_client.has_collection(collection_name, timeout=5) +if has_collection: + milvus_client.drop_collection(collection_name) + +schema = milvus_client.create_schema(enable_dynamic_field=True) +schema.add_field("id", DataType.INT64, is_primary=True) +schema.add_field("embeddings", DataType.FLOAT_VECTOR, dim=dim) +schema.add_field("title", DataType.VARCHAR, max_length=64) +milvus_client.create_collection(collection_name, schema=schema, consistency_level="Strong") +index_params = milvus_client.prepare_index_params() +index_params.add_index(field_name = "embeddings", metric_type="L2") +index_params.add_index(field_name = "title", index_type = "Trie", index_name="my_trie") +milvus_client.create_index(collection_name, index_params) +milvus_client.load_collection(collection_name) + + +## create resource group +print("create resource group") +milvus_client.create_resource_group("rg1") +milvus_client.create_resource_group("rg2") + +## update resource group +configs = { + "rg1": ResourceGroupConfig( + requests={"node_num": 1}, + limits={"node_num": 5}, + transfer_from=[{"resource_group": DEFAULT_RESOURCE_GROUP}], + transfer_to=[{"resource_group": DEFAULT_RESOURCE_GROUP}], + ), + "rg2": ResourceGroupConfig( + requests={"node_num": 4}, + limits={"node_num": 4}, + transfer_from=[{"resource_group": DEFAULT_RESOURCE_GROUP}], + transfer_to=[{"resource_group": DEFAULT_RESOURCE_GROUP}], + ), + } +milvus_client.update_resource_groups(configs) + +## describe resource group +print("describe rg1") +result = milvus_client.describe_resource_group("rg1") +print(result) + +print("describe rg2") +result = milvus_client.describe_resource_group("rg2") +print(result) + +## list resource group +print("list resource group") +result = milvus_client.list_resource_groups() +print(result) + +## transfer replica +print("transfer replica to rg1") +milvus_client.transfer_replica(DEFAULT_RESOURCE_GROUP, "rg1", collection_name, 1) +print("describe rg1 after transfer replica in") +result = milvus_client.describe_resource_group("rg1") +print(result) + +milvus_client.transfer_replica("rg1", DEFAULT_RESOURCE_GROUP, collection_name, 1) +print("describe rg1 after transfer replica out") +result = milvus_client.describe_resource_group("rg1") +print(result) + +## drop resource group +print("drop resource group") +# create resource group +configs = { + "rg1": ResourceGroupConfig( + requests={"node_num": 0}, + limits={"node_num": 0}, + transfer_from=[{"resource_group": DEFAULT_RESOURCE_GROUP}], + transfer_to=[{"resource_group": DEFAULT_RESOURCE_GROUP}], + ), + "rg2": ResourceGroupConfig( + requests={"node_num": 0}, + limits={"node_num": 0}, + transfer_from=[{"resource_group": DEFAULT_RESOURCE_GROUP}], + transfer_to=[{"resource_group": DEFAULT_RESOURCE_GROUP}], + ), + } +milvus_client.update_resource_groups(configs) +milvus_client.drop_resource_group("rg1") +milvus_client.drop_resource_group("rg2") + + diff --git a/pymilvus/milvus_client/milvus_client.py b/pymilvus/milvus_client/milvus_client.py index d58f0f77a..91f548072 100644 --- a/pymilvus/milvus_client/milvus_client.py +++ b/pymilvus/milvus_client/milvus_client.py @@ -11,6 +11,7 @@ ExtraList, LoadState, OmitZeroDict, + ResourceGroupConfig, construct_cost_extra, ) from pymilvus.client.utils import is_vector_type @@ -1525,3 +1526,102 @@ def remove_privileges_from_group( """ conn = self._get_connection() conn.remove_privileges_from_group(group_name, privileges, timeout=timeout, **kwargs) + + def create_resource_group(self, name: str, timeout: Optional[float] = None, **kwargs): + """Create a resource group + It will success whether or not the resource group exists. + + Args: + name: The name of the resource group. + Raises: + MilvusException: If anything goes wrong. + """ + conn = self._get_connection() + return conn.create_resource_group(name, timeout, **kwargs) + + def update_resource_groups( + self, + configs: Dict[str, ResourceGroupConfig], + timeout: Optional[float] = None, + ): + """Update resource groups. + This function updates the resource groups based on the provided configurations. + + Args: + configs: A mapping of resource group names to their configurations. + timeout: The timeout value in seconds. Defaults to None. + Raises: + MilvusException: If anything goes wrong. + """ + conn = self._get_connection() + return conn.update_resource_groups(configs, timeout) + + def drop_resource_group( + self, + name: str, + timeout: Optional[float] = None, + ): + """Drop a resource group + It will success if the resource group is existed and empty, otherwise fail. + + Args: + name: The name of the resource group. + timeout: The timeout value in seconds. Defaults to None. + Raises: + MilvusException: If anything goes wrong. + """ + conn = self._get_connection() + return conn.drop_resource_group(name, timeout) + + def describe_resource_group(self, name: str, timeout: Optional[float] = None): + """Drop a resource group + It will success if the resource group is existed and empty, otherwise fail. + + Args: + name: The name of the resource group. + timeout: The timeout value in seconds. Defaults to None. + Returns: + ResourceGroupInfo: The detail info of the resource group. + Raises: + MilvusException: If anything goes wrong. + """ + conn = self._get_connection() + return conn.describe_resource_group(name, timeout) + + def list_resource_groups(self, timeout: Optional[float] = None): + """list all resource group names + + Args: + timeout: The timeout value in seconds. Defaults to None. + Returns: + list[str]: all resource group names + Raises: + MilvusException: If anything goes wrong. + """ + conn = self._get_connection() + return conn.list_resource_groups(timeout) + + def transfer_replica( + self, + source_group: str, + target_group: str, + collection_name: str, + num_replicas: int, + timeout: Optional[float] = None, + ): + """transfer num_replica from source resource group to target resource group + + Args: + source_group: source resource group name + target_group: target resource group name + collection_name: collection name which replica belong to + num_replicas: transfer replica num + timeout: The timeout value in seconds. Defaults to None. + + Raises: + MilvusException: If anything goes wrong. + """ + conn = self._get_connection() + return conn.transfer_replica( + source_group, target_group, collection_name, num_replicas, timeout + )