Skip to content

Commit

Permalink
enhance: Enable resource group api in milvus client (milvus-io#2513)
Browse files Browse the repository at this point in the history
issue: milvus-io/milvus#38739
this PR enable those API in milvus client:
- create_resource_group
- update_resource_groups
- drop_resource_group
- describe_resource_group
- list_resource_groups
- transfer_replica

cause the resource group prefer to use declarative api, so the
transfer_node API is deprecated in milvus client, it's recommand to use
update_resource_group to manage node num in resource group.

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 committed Dec 26, 2024
1 parent 292390d commit 4ba030e
Show file tree
Hide file tree
Showing 2 changed files with 205 additions and 0 deletions.
105 changes: 105 additions & 0 deletions examples/resource_group.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
from pymilvus import (
MilvusClient,
DataType,
)
from pymilvus.client.constants import DEFAULT_RESOURCE_GROUP

from pymilvus.client.types import (
ResourceGroupConfig,
)

fmt = "\n=== {:30} ===\n"
dim = 8
collection_name = "hello_milvus"
milvus_client = MilvusClient("http://localhost:19530")


## create collection and load collection
print("create collection and load collection")
collection_name = "hello_milvus"
has_collection = milvus_client.has_collection(collection_name, timeout=5)
if has_collection:
milvus_client.drop_collection(collection_name)

schema = milvus_client.create_schema(enable_dynamic_field=True)
schema.add_field("id", DataType.INT64, is_primary=True)
schema.add_field("embeddings", DataType.FLOAT_VECTOR, dim=dim)
schema.add_field("title", DataType.VARCHAR, max_length=64)
milvus_client.create_collection(collection_name, schema=schema, consistency_level="Strong")
index_params = milvus_client.prepare_index_params()
index_params.add_index(field_name = "embeddings", metric_type="L2")
index_params.add_index(field_name = "title", index_type = "Trie", index_name="my_trie")
milvus_client.create_index(collection_name, index_params)
milvus_client.load_collection(collection_name)


## create resource group
print("create resource group")
milvus_client.create_resource_group("rg1")
milvus_client.create_resource_group("rg2")

## update resource group
configs = {
"rg1": ResourceGroupConfig(
requests={"node_num": 1},
limits={"node_num": 5},
transfer_from=[{"resource_group": DEFAULT_RESOURCE_GROUP}],
transfer_to=[{"resource_group": DEFAULT_RESOURCE_GROUP}],
),
"rg2": ResourceGroupConfig(
requests={"node_num": 4},
limits={"node_num": 4},
transfer_from=[{"resource_group": DEFAULT_RESOURCE_GROUP}],
transfer_to=[{"resource_group": DEFAULT_RESOURCE_GROUP}],
),
}
milvus_client.update_resource_groups(configs)

## describe resource group
print("describe rg1")
result = milvus_client.describe_resource_group("rg1")
print(result)

print("describe rg2")
result = milvus_client.describe_resource_group("rg2")
print(result)

## list resource group
print("list resource group")
result = milvus_client.list_resource_groups()
print(result)

## transfer replica
print("transfer replica to rg1")
milvus_client.transfer_replica(DEFAULT_RESOURCE_GROUP, "rg1", collection_name, 1)
print("describe rg1 after transfer replica in")
result = milvus_client.describe_resource_group("rg1")
print(result)

milvus_client.transfer_replica("rg1", DEFAULT_RESOURCE_GROUP, collection_name, 1)
print("describe rg1 after transfer replica out")
result = milvus_client.describe_resource_group("rg1")
print(result)

## drop resource group
print("drop resource group")
# create resource group
configs = {
"rg1": ResourceGroupConfig(
requests={"node_num": 0},
limits={"node_num": 0},
transfer_from=[{"resource_group": DEFAULT_RESOURCE_GROUP}],
transfer_to=[{"resource_group": DEFAULT_RESOURCE_GROUP}],
),
"rg2": ResourceGroupConfig(
requests={"node_num": 0},
limits={"node_num": 0},
transfer_from=[{"resource_group": DEFAULT_RESOURCE_GROUP}],
transfer_to=[{"resource_group": DEFAULT_RESOURCE_GROUP}],
),
}
milvus_client.update_resource_groups(configs)
milvus_client.drop_resource_group("rg1")
milvus_client.drop_resource_group("rg2")


100 changes: 100 additions & 0 deletions pymilvus/milvus_client/milvus_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
ExtraList,
LoadState,
OmitZeroDict,
ResourceGroupConfig,
construct_cost_extra,
)
from pymilvus.client.utils import is_vector_type
Expand Down Expand Up @@ -1525,3 +1526,102 @@ def remove_privileges_from_group(
"""
conn = self._get_connection()
conn.remove_privileges_from_group(group_name, privileges, timeout=timeout, **kwargs)

def create_resource_group(self, name: str, timeout: Optional[float] = None, **kwargs):
"""Create a resource group
It will success whether or not the resource group exists.
Args:
name: The name of the resource group.
Raises:
MilvusException: If anything goes wrong.
"""
conn = self._get_connection()
return conn.create_resource_group(name, timeout, **kwargs)

def update_resource_groups(
self,
configs: Dict[str, ResourceGroupConfig],
timeout: Optional[float] = None,
):
"""Update resource groups.
This function updates the resource groups based on the provided configurations.
Args:
configs: A mapping of resource group names to their configurations.
timeout: The timeout value in seconds. Defaults to None.
Raises:
MilvusException: If anything goes wrong.
"""
conn = self._get_connection()
return conn.update_resource_groups(configs, timeout)

def drop_resource_group(
self,
name: str,
timeout: Optional[float] = None,
):
"""Drop a resource group
It will success if the resource group is existed and empty, otherwise fail.
Args:
name: The name of the resource group.
timeout: The timeout value in seconds. Defaults to None.
Raises:
MilvusException: If anything goes wrong.
"""
conn = self._get_connection()
return conn.drop_resource_group(name, timeout)

def describe_resource_group(self, name: str, timeout: Optional[float] = None):
"""Drop a resource group
It will success if the resource group is existed and empty, otherwise fail.
Args:
name: The name of the resource group.
timeout: The timeout value in seconds. Defaults to None.
Returns:
ResourceGroupInfo: The detail info of the resource group.
Raises:
MilvusException: If anything goes wrong.
"""
conn = self._get_connection()
return conn.describe_resource_group(name, timeout)

def list_resource_groups(self, timeout: Optional[float] = None):
"""list all resource group names
Args:
timeout: The timeout value in seconds. Defaults to None.
Returns:
list[str]: all resource group names
Raises:
MilvusException: If anything goes wrong.
"""
conn = self._get_connection()
return conn.list_resource_groups(timeout)

def transfer_replica(
self,
source_group: str,
target_group: str,
collection_name: str,
num_replicas: int,
timeout: Optional[float] = None,
):
"""transfer num_replica from source resource group to target resource group
Args:
source_group: source resource group name
target_group: target resource group name
collection_name: collection name which replica belong to
num_replicas: transfer replica num
timeout: The timeout value in seconds. Defaults to None.
Raises:
MilvusException: If anything goes wrong.
"""
conn = self._get_connection()
return conn.transfer_replica(
source_group, target_group, collection_name, num_replicas, timeout
)

0 comments on commit 4ba030e

Please sign in to comment.