From e836ded6cc7465aab77afee5c1a503aaf756ec93 Mon Sep 17 00:00:00 2001 From: chyezh Date: Mon, 19 Feb 2024 16:21:02 +0800 Subject: [PATCH] enhance: add updates resource group for java sdk - Add UpdateResourceGroups and modify AddResourceGroup api. Signed-off-by: chyezh --- .../java/io/milvus/ResourceGroupExample.java | 65 ++++ .../io/milvus/resourcegroup/NodeInfo.java | 46 +++ .../resourcegroup/ResourceGroupInfo.java | 111 +++++++ .../ResourceGroupManagement.java | 279 ++++++++++++++++++ .../client/AbstractMilvusGrpcClient.java | 36 ++- .../java/io/milvus/client/MilvusClient.java | 9 + .../client/MilvusMultiServiceClient.java | 5 + .../io/milvus/client/MilvusServiceClient.java | 5 + .../common/resourcegroup/ResourceGroup.java | 23 ++ .../resourcegroup/ResourceGroupConfig.java | 105 +++++++ .../resourcegroup/ResourceGroupLimit.java | 38 +++ .../resourcegroup/ResourceGroupTransfer.java | 38 +++ .../param/control/GetReplicasParam.java | 10 +- .../CreateResourceGroupParam.java | 15 + .../UpdateResourceGroupsParam.java | 99 +++++++ src/main/milvus-proto | 2 +- 16 files changed, 870 insertions(+), 16 deletions(-) create mode 100644 examples/main/java/io/milvus/ResourceGroupExample.java create mode 100644 examples/main/java/io/milvus/resourcegroup/NodeInfo.java create mode 100644 examples/main/java/io/milvus/resourcegroup/ResourceGroupInfo.java create mode 100644 examples/main/java/io/milvus/resourcegroup/ResourceGroupManagement.java create mode 100644 src/main/java/io/milvus/common/resourcegroup/ResourceGroup.java create mode 100644 src/main/java/io/milvus/common/resourcegroup/ResourceGroupConfig.java create mode 100644 src/main/java/io/milvus/common/resourcegroup/ResourceGroupLimit.java create mode 100644 src/main/java/io/milvus/common/resourcegroup/ResourceGroupTransfer.java create mode 100644 src/main/java/io/milvus/param/resourcegroup/UpdateResourceGroupsParam.java diff --git a/examples/main/java/io/milvus/ResourceGroupExample.java b/examples/main/java/io/milvus/ResourceGroupExample.java new file mode 100644 index 000000000..8ad5aabe0 --- /dev/null +++ b/examples/main/java/io/milvus/ResourceGroupExample.java @@ -0,0 +1,65 @@ +package io.milvus; + +import com.google.gson.Gson; + +import io.milvus.client.MilvusServiceClient; +import io.milvus.resourcegroup.ResourceGroupManagement; +import io.milvus.param.ConnectParam; + +public class ResourceGroupExample { + private static final ResourceGroupManagement manager; + private static final String rgName1 = "rg1"; + private static final String rgName2 = "rg2"; + private static Gson gson = new Gson(); + + static { + ConnectParam connectParam = ConnectParam.newBuilder() + .withHost("localhost") + .withPort(19530) + .withAuthorization("root", "Milvus") + .build(); + manager = new ResourceGroupManagement(new MilvusServiceClient(connectParam)); + } + + private static void printResourceGroupInfo() { + manager.listResourceGroups().forEach((name, rg) -> { + System.out.println(name); + System.out.println(gson.toJson(rg)); + }); + } + + public static void main(String[] args) throws Exception { + printResourceGroupInfo(); + // Initialize the cluster with 1 resource group + // default_rg: 1 + manager.initializeCluster(1); + printResourceGroupInfo(); + + // Add one more node to default rg. + manager.scaleResourceGroupTo(ResourceGroupManagement.DEFAULT_RG, 2); + // add new query node. + // default_rg: 2 + printResourceGroupInfo(); + + // Add a new resource group. + manager.createResourceGroup(rgName1, 1); + // default_rg: 2, rg1: 0 + // add new query node. + // default_rg: 2, rg1: 1 + printResourceGroupInfo(); + + // Add a new resource group. + manager.createResourceGroup(rgName2, 2); + // default_rg: 2, rg1: 1, rg2: 0 + // add new query node. + // default_rg: 2, rg1: 1, rg2: 1 + // add new query node. + // default_rg: 2, rg1: 1, rg2: 2 + printResourceGroupInfo(); + + // downscale default_rg to 1 + manager.scaleResourceGroupTo(ResourceGroupManagement.DEFAULT_RG, 1); + // default_rg: 1, rg1: 1, rg2: 2, recycle_rg: 1 + printResourceGroupInfo(); + } +} \ No newline at end of file diff --git a/examples/main/java/io/milvus/resourcegroup/NodeInfo.java b/examples/main/java/io/milvus/resourcegroup/NodeInfo.java new file mode 100644 index 000000000..87ce23809 --- /dev/null +++ b/examples/main/java/io/milvus/resourcegroup/NodeInfo.java @@ -0,0 +1,46 @@ +package io.milvus.resourcegroup; + +import lombok.Getter; +import lombok.NonNull; + +@Getter +public class NodeInfo { + private long nodeId; + private String address; + private String hostname; + + private NodeInfo(Builder builder) { + this.nodeId = builder.nodeId; + this.address = builder.address; + this.hostname = builder.hostname; + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder { + private long nodeId; + private String address; + private String hostname; + + public Builder withNodeId(long nodeId) { + this.nodeId = nodeId; + return this; + } + + public Builder withAddress(@NonNull String address) { + this.address = address; + return this; + } + + public Builder withHostname(@NonNull String hostname) { + this.hostname = hostname; + return this; + } + + public NodeInfo build() { + return new NodeInfo(this); + } + } +} diff --git a/examples/main/java/io/milvus/resourcegroup/ResourceGroupInfo.java b/examples/main/java/io/milvus/resourcegroup/ResourceGroupInfo.java new file mode 100644 index 000000000..4a61bf96f --- /dev/null +++ b/examples/main/java/io/milvus/resourcegroup/ResourceGroupInfo.java @@ -0,0 +1,111 @@ +package io.milvus.resourcegroup; + +import java.util.HashSet; +import java.util.Set; + +import io.milvus.common.resourcegroup.ResourceGroupConfig; +import lombok.Getter; +import lombok.NonNull; + +@Getter +public class ResourceGroupInfo { + private String resourceGroupName; + private ResourceGroupConfig resourceGroupConfig; + private Set fullDatabases; // databases belong to this resource group completely. + private Set partialDatabases; // databases belong to this resource group partially, some collection is in + // other resource group. + private Set nodes; // actual query node in this resource group. + private Integer requestsNodeNum; // max query node in this resource group. + + private ResourceGroupInfo(@NonNull Builder builder) { + this.resourceGroupName = builder.resourceGroupName; + this.resourceGroupConfig = builder.resourceGroupConfig; + this.fullDatabases = builder.fullDatabases; + if (this.fullDatabases == null) { + this.fullDatabases = new HashSet(); + } + this.partialDatabases = builder.partialDatabases; + if (this.partialDatabases == null) { + this.partialDatabases = new HashSet(); + } + this.nodes = builder.nodes; + if (this.nodes == null) { + this.nodes = new HashSet(); + } + this.requestsNodeNum = builder.requestsNodeNum; + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static final class Builder { + private String resourceGroupName; + private ResourceGroupConfig resourceGroupConfig; + private Set fullDatabases; + private Set partialDatabases; + private Set nodes; // actual query node in this resource group. + private Integer requestsNodeNum; + + public Builder withResourceGroupName(@NonNull String resourceGroupName) { + this.resourceGroupName = resourceGroupName; + return this; + } + + public Builder addFullDatabases(@NonNull String databaseName) { + if (this.fullDatabases == null) { + this.fullDatabases = new HashSet(); + } + this.fullDatabases.add(databaseName); + return this; + } + + public Builder addPartialDatabases(@NonNull String databaseName) { + if (this.partialDatabases == null) { + this.partialDatabases = new HashSet(); + } + this.partialDatabases.add(databaseName); + return this; + } + + public Builder addAvailableNode(@NonNull NodeInfo node) { + if (this.nodes == null) { + this.nodes = new HashSet(); + } + this.nodes.add(node); + return this; + } + + public Builder withRequestsNodeNum(@NonNull Integer requestsNodeNum) { + this.requestsNodeNum = requestsNodeNum; + return this; + } + + public Builder withConfig(@NonNull ResourceGroupConfig resourceGroupConfig) { + this.resourceGroupConfig = resourceGroupConfig; + return this; + } + + public ResourceGroupInfo build() { + return new ResourceGroupInfo(this); + } + } + + /** + * Check if this resource group is the default resource group. + * + * @return true if this resource group is the default resource group. + */ + public boolean isDefaultResourceGroup() { + return this.resourceGroupName == ResourceGroupManagement.DEFAULT_RG; + } + + /** + * Check if this resource group is the recycle resource group. + * + * @return true if this resource group is the recycle resource group. + */ + public boolean isRecycleResourceGroup() { + return this.resourceGroupName == ResourceGroupManagement.RECYCLE_RG; + } +} diff --git a/examples/main/java/io/milvus/resourcegroup/ResourceGroupManagement.java b/examples/main/java/io/milvus/resourcegroup/ResourceGroupManagement.java new file mode 100644 index 000000000..169c436c5 --- /dev/null +++ b/examples/main/java/io/milvus/resourcegroup/ResourceGroupManagement.java @@ -0,0 +1,279 @@ +package io.milvus.resourcegroup; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import io.milvus.client.MilvusClient; +import io.milvus.param.R; +import io.milvus.param.RpcStatus; +import io.milvus.param.collection.GetLoadStateParam; +import io.milvus.param.collection.ShowCollectionsParam; +import io.milvus.param.control.GetReplicasParam; +import io.milvus.param.resourcegroup.CreateResourceGroupParam; +import io.milvus.param.resourcegroup.DescribeResourceGroupParam; +import io.milvus.param.resourcegroup.DropResourceGroupParam; +import io.milvus.param.resourcegroup.ListResourceGroupsParam; +import io.milvus.param.resourcegroup.TransferReplicaParam; +import io.milvus.param.resourcegroup.UpdateResourceGroupsParam; +import io.milvus.common.resourcegroup.ResourceGroupConfig; +import io.milvus.common.resourcegroup.ResourceGroupLimit; +import io.milvus.common.resourcegroup.ResourceGroupTransfer; +import io.milvus.exception.MilvusException; +import io.milvus.grpc.DescribeResourceGroupResponse; +import io.milvus.grpc.GetLoadStateResponse; +import io.milvus.grpc.GetReplicasResponse; +import io.milvus.grpc.ListDatabasesResponse; +import io.milvus.grpc.ListResourceGroupsResponse; +import io.milvus.grpc.LoadState; +import io.milvus.grpc.ShowCollectionsResponse; + +import org.slf4j.LoggerFactory; +import org.slf4j.Logger; + +public class ResourceGroupManagement { + public static String RECYCLE_RG = "__recycle_resource_group"; + public static String DEFAULT_RG = "__default_resource_group"; + static Integer RECYCLE_RG_REQUEST_NODE_NUM = 0; + static Integer RECYCLE_RG_LIMIT_NODE_NUM = 100000; + + protected static final Logger logger = LoggerFactory.getLogger(ResourceGroupManagement.class); + + private MilvusClient client; + + public ResourceGroupManagement(MilvusClient client) { + this.client = client; + } + + /** + * list all resource groups. + * + * @return map of resource group name and resource group info. + */ + public Map listResourceGroups() throws Exception { + // List all resource groups. + R response = client + .listResourceGroups(ListResourceGroupsParam.newBuilder().build()); + ListResourceGroupsResponse resourceGroups = unwrap(response); + + // Describe all resource groups. + Map result = new HashMap<>(); + for (String resourceGroupName : resourceGroups.getResourceGroupsList()) { + R resourceGroupInfoResp = client.describeResourceGroup( + DescribeResourceGroupParam.newBuilder().withGroupName(resourceGroupName).build()); + DescribeResourceGroupResponse resourceGroupInfo = unwrap(resourceGroupInfoResp); + + ResourceGroupInfo.Builder builder = ResourceGroupInfo.newBuilder() + .withResourceGroupName(resourceGroupName) + .withRequestsNodeNum(resourceGroupInfo.getResourceGroup().getConfig().getRequests().getNodeNum()) + .withConfig(new ResourceGroupConfig(resourceGroupInfo.getResourceGroup().getConfig())); + + resourceGroupInfo.getResourceGroup().getNodesList().forEach(node -> { + builder.addAvailableNode(NodeInfo.newBuilder() + .withNodeId(node.getNodeID()) + .withAddress(node.getAddress()) + .withHostname(node.getHostname()) + .build()); + }); + result.put(resourceGroupName, builder); + } + + // Get map info between resource group and database. + R listDatabaseGroupsResp = client.listDatabases(); + ListDatabasesResponse databases = unwrap(listDatabaseGroupsResp); + // list all collections. + for (String dbName : databases.getDbNamesList()) { + R resp = client.showCollections(ShowCollectionsParam.newBuilder() + .withDatabaseName(dbName) + .build()); + ShowCollectionsResponse showCollectionResponse = unwrap(resp); + + Set resourceGroupNames = new HashSet<>(); + for (String collection : showCollectionResponse.getCollectionNamesList()) { + String resourceGroupName = getCollectionResourceGroupName(dbName, collection); + resourceGroupNames.add(resourceGroupName); + } + + if (resourceGroupNames.size() == 0) { + logger.info("no loaded collection in database {}", dbName); + continue; + } else if (resourceGroupNames.size() == 1) { + // all loaded collection in one resource group. + for (String resourceGroupName : resourceGroupNames) { + result.get(resourceGroupName).addFullDatabases(dbName); + } + } else { + // loaded collection in same db in different resource group. + for (String resourceGroupName : resourceGroupNames) { + result.get(resourceGroupName).addPartialDatabases(dbName); + } + } + } + + return result.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().build())); + } + + /** + * Initialize the cluster with a recycle resource group. + * + * @param defaultResourceGroupNodeNum The number of query nodes to initialize + * the default resource group. + */ + public void initializeCluster(Integer defaultResourceGroupNodeNum) throws Exception { + // Create a recycle resource group to hold all redundant query node. + R resp = client.createResourceGroup(CreateResourceGroupParam.newBuilder() + .withGroupName(RECYCLE_RG) + .withConfig(ResourceGroupConfig.newBuilder() + .withRequests(new ResourceGroupLimit(RECYCLE_RG_REQUEST_NODE_NUM)) + .withLimits(new ResourceGroupLimit(RECYCLE_RG_LIMIT_NODE_NUM)) + .build()) + .build()); + unwrap(resp); + this.scaleResourceGroupTo(DEFAULT_RG, defaultResourceGroupNodeNum); + } + + /** + * Create a resource group with given nodeNum. + * + * @param resourceGroupName + * @param requestNodeNum + */ + public void createResourceGroup(String resourceGroupName, Integer requestNodeNum) throws Exception { + R resp = client.createResourceGroup(CreateResourceGroupParam.newBuilder() + .withGroupName(resourceGroupName) + .withConfig(ResourceGroupConfig.newBuilder() + .withRequests(new ResourceGroupLimit(requestNodeNum)) + .withLimits(new ResourceGroupLimit(requestNodeNum)) + .withFrom(Arrays.asList(new ResourceGroupTransfer(RECYCLE_RG))) + .withTo(Arrays.asList(new ResourceGroupTransfer(RECYCLE_RG))) + .build()) + .build()); + unwrap(resp); + } + + /** + * Drop a resource group, before drop resource group, you should scale the + * resource group to 0 first. + * + * @param resourceGroupName + */ + public void dropResourceGroup(String resourceGroupName) throws Exception { + R resp = client + .dropResourceGroup(DropResourceGroupParam.newBuilder().withGroupName(resourceGroupName).build()); + unwrap(resp); + } + + /** + * Scale to the number of nodes in a resource group. + * + * @param resourceGroupName + * @param requestNodeNum + */ + public void scaleResourceGroupTo(String resourceGroupName, Integer requestNodeNum) throws Exception { + if (resourceGroupName == RECYCLE_RG) { + throw new IllegalArgumentException("Cannot scale to recycle resource group"); + } + // Update a resource group with given nodeNum. + // Set the resource group transfer to recycle resource group by default. + R resp = client + .updateResourceGroups(UpdateResourceGroupsParam.newBuilder() + .putResourceGroup(resourceGroupName, ResourceGroupConfig.newBuilder() + .withRequests(new ResourceGroupLimit(requestNodeNum)) + .withLimits(new ResourceGroupLimit(requestNodeNum)) + .withFrom(Arrays.asList(new ResourceGroupTransfer(RECYCLE_RG))) + .withTo(Arrays.asList(new ResourceGroupTransfer(RECYCLE_RG))) + .build()) + .build()); + unwrap(resp); + } + + /** + * Transfer a database to specified resource group. + * Only support single replica now. + * + * @param dbName The name of the database to transfer. + * @param resourceGroupName The name of the target resource group. + */ + public void transferDataBaseToResourceGroup(String dbName, String resourceGroupName) throws Exception { + R resp = client.showCollections(ShowCollectionsParam.newBuilder() + .withDatabaseName(dbName) + .build()); + ShowCollectionsResponse showCollectionResponse = unwrap(resp); + + for (String collection : showCollectionResponse.getCollectionNamesList()) { + String currentResourceGroupName = getCollectionResourceGroupName(dbName, collection); + // skip if the collection is not loaded or is already added to resourceGroup. + if (currentResourceGroupName == null || currentResourceGroupName == resourceGroupName) { + continue; + } + R status = client.transferReplica(TransferReplicaParam.newBuilder() + .withDatabaseName(dbName) + .withCollectionName(collection) + .withSourceGroupName(currentResourceGroupName) + .withTargetGroupName(resourceGroupName) + .withReplicaNumber(1L) + .build()); + unwrap(status); + } + } + + /** + * get the resource group name of the collection. + * + * @param dbName + * @param collection + * @return + * @throws Exception + */ + private String getCollectionResourceGroupName(String dbName, String collection) throws Exception { + R loaded = client.getLoadState(GetLoadStateParam.newBuilder(). + withDatabaseName(dbName). + withCollectionName(collection). + build()); + GetLoadStateResponse loadedState = unwrap(loaded); + + if (loadedState.getState() != LoadState.LoadStateLoaded) { + logger.info("Collection {} @ Database {} is not loaded, state is {}, skip it", collection, dbName, + loaded.getData().getState()); + return null; + } + + // Get the current resource group of the collection. + R replicaResp = client + .getReplicas(GetReplicasParam.newBuilder().withCollectionName(collection).withDatabaseName(dbName).build()); + GetReplicasResponse replicas = unwrap(replicaResp); + + if (replicas.getReplicasCount() > 1) { + // Multi replica is now supported now. + throw new RuntimeException(String.format("Replica number {} is greater than 1, did not support now", + replicas.getReplicasCount())); + } + if (replicas.getReplicasCount() == 0) { + logger.warn("Collection {} @ Database {} has no replica, skip it", collection, dbName); + return null; + } + + return replicas.getReplicasList().get(0).getResourceGroupName(); + } + + /** + * + * @param + * @param response + * @return + */ + private T unwrap(R response) throws Exception { + if (response.getStatus() != 0) { + if (response.getException() instanceof MilvusException) { + MilvusException e = (MilvusException) response.getException(); + throw e; + } + logger.warn("at unwrap", response.getException()); + throw response.getException(); + } + return response.getData(); + } +} diff --git a/src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java b/src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java index 4d1fa1209..14149ba4a 100644 --- a/src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java +++ b/src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java @@ -1989,22 +1989,15 @@ public R getReplicas(GetReplicasParam requestParam) { String title = String.format("GetReplicasRequest collectionName:%s", requestParam.getCollectionName()); try { - R descResp = describeCollection(DescribeCollectionParam.newBuilder() - .withCollectionName(requestParam.getCollectionName()) - .build()); - if (descResp.getStatus() != R.Status.Success.getCode()) { - return R.failed(descResp.getException()); - } - GetReplicasRequest.Builder builder = GetReplicasRequest.newBuilder() - .setCollectionID(descResp.getData().getCollectionID()) + .setCollectionName(requestParam.getCollectionName()) .setWithShardNodes(requestParam.isWithShardNodes()); if (StringUtils.isNotBlank(requestParam.getDatabaseName())) { builder.setDbName(requestParam.getDatabaseName()); } - - GetReplicasResponse response = blockingStub().getReplicas(builder.build()); + GetReplicasRequest getReplicasRequest = builder.build(); + GetReplicasResponse response = blockingStub().getReplicas(getReplicasRequest); handleResponse(title, response.getStatus()); return R.success(response); } catch (StatusRuntimeException e) { @@ -2760,6 +2753,7 @@ public R createResourceGroup(CreateResourceGroupParam requestParam) { try { CreateResourceGroupRequest request = CreateResourceGroupRequest.newBuilder() .setResourceGroup(requestParam.getGroupName()) + .setConfig(requestParam.getConfig().toGRPC()) .build(); Status response = blockingStub().createResourceGroup(request); @@ -2913,6 +2907,28 @@ public R transferReplica(TransferReplicaParam requestParam) { } } + @Override + public R updateResourceGroups(UpdateResourceGroupsParam requestParam) { + if (!clientIsReady()) { + return R.failed(new ClientNotConnectedException("Client rpc channel is not ready")); + } + + logDebug(requestParam.toString()); + + try { + UpdateResourceGroupsRequest request = requestParam.toGRPC(); + + Status response = blockingStub().updateResourceGroups(request); + handleResponse(requestParam.toString(), response); + return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG)); + } catch (StatusRuntimeException e) { + logError("{} RPC failed! Exception:{}", requestParam.toString(), e); + return R.failed(e); + } catch (Exception e) { + logError("{} failed! Exception:{}", requestParam.toString(), e); + return R.failed(e); + } + } ///////////////////// High Level API////////////////////// @Override diff --git a/src/main/java/io/milvus/client/MilvusClient.java b/src/main/java/io/milvus/client/MilvusClient.java index e64ad96a5..8d5597f55 100644 --- a/src/main/java/io/milvus/client/MilvusClient.java +++ b/src/main/java/io/milvus/client/MilvusClient.java @@ -731,6 +731,15 @@ default void close() { */ R createResourceGroup(CreateResourceGroupParam requestParam); + + /** + * Update resource groups. + * + * @param requestParam {@link UpdateResourceGroupsParam} + * @return {status:result code, data:RpcStatus{msg: result message}} + */ + R updateResourceGroups(UpdateResourceGroupsParam requestParam); + /** * Drop a resource group. * diff --git a/src/main/java/io/milvus/client/MilvusMultiServiceClient.java b/src/main/java/io/milvus/client/MilvusMultiServiceClient.java index 37ba3434c..cfb0f59cf 100644 --- a/src/main/java/io/milvus/client/MilvusMultiServiceClient.java +++ b/src/main/java/io/milvus/client/MilvusMultiServiceClient.java @@ -603,6 +603,11 @@ public R createResourceGroup(CreateResourceGroupParam requestParam) { return this.clusterFactory.getMaster().getClient().createResourceGroup(requestParam); } + @Override + public R updateResourceGroups(UpdateResourceGroupsParam requestParam) { + return this.clusterFactory.getMaster().getClient().updateResourceGroups(requestParam); + } + @Override public R dropResourceGroup(DropResourceGroupParam requestParam) { return this.clusterFactory.getMaster().getClient().dropResourceGroup(requestParam); diff --git a/src/main/java/io/milvus/client/MilvusServiceClient.java b/src/main/java/io/milvus/client/MilvusServiceClient.java index 485141ffd..2d01e496a 100644 --- a/src/main/java/io/milvus/client/MilvusServiceClient.java +++ b/src/main/java/io/milvus/client/MilvusServiceClient.java @@ -730,6 +730,11 @@ public R transferReplica(TransferReplicaParam requestParam) { return retry(()-> super.transferReplica(requestParam)); } + @Override + public R updateResourceGroups(UpdateResourceGroupsParam requestParam) { + return retry(()-> super.updateResourceGroups(requestParam)); + } + @Override public R createCollection(CreateSimpleCollectionParam requestParam) { return retry(()-> super.createCollection(requestParam)); diff --git a/src/main/java/io/milvus/common/resourcegroup/ResourceGroup.java b/src/main/java/io/milvus/common/resourcegroup/ResourceGroup.java new file mode 100644 index 000000000..243a9fd1d --- /dev/null +++ b/src/main/java/io/milvus/common/resourcegroup/ResourceGroup.java @@ -0,0 +1,23 @@ +package io.milvus.common.resourcegroup; + +import lombok.Getter; +import lombok.NonNull; + +@Getter +public class ResourceGroup { + private String resourceGroupName; + private Integer availableNodeNum; + private java.util.Map loadedReplicaNum; + private java.util.Map outgoingNodeNum; + private java.util.Map incomingNodeNum; + private ResourceGroupConfig resourceGroupConfig; + + public ResourceGroup(@NonNull io.milvus.grpc.ResourceGroup rg) { + this.resourceGroupName = rg.getName(); + this.availableNodeNum = rg.getNumAvailableNode(); + this.loadedReplicaNum = rg.getNumLoadedReplicaMap(); + this.outgoingNodeNum = rg.getNumOutgoingNodeMap(); + this.incomingNodeNum = rg.getNumIncomingNodeMap(); + this.resourceGroupConfig = new ResourceGroupConfig(rg.getConfig()); + } +} diff --git a/src/main/java/io/milvus/common/resourcegroup/ResourceGroupConfig.java b/src/main/java/io/milvus/common/resourcegroup/ResourceGroupConfig.java new file mode 100644 index 000000000..79c933ae0 --- /dev/null +++ b/src/main/java/io/milvus/common/resourcegroup/ResourceGroupConfig.java @@ -0,0 +1,105 @@ +package io.milvus.common.resourcegroup; + +import java.util.stream.Collectors; +import java.util.List; +import lombok.NonNull; +import lombok.Getter; + +@Getter +public class ResourceGroupConfig { + private final ResourceGroupLimit requests; + private final ResourceGroupLimit limits; + private final List from; + private final List to; + + private ResourceGroupConfig(Builder builder) { + this.requests = builder.requests; + this.limits = builder.limits; + + if (null == builder.from) { + this.from = List.of(); + } else { + this.from = builder.from; + } + + if (null == builder.to) { + this.to = List.of(); + } else { + this.to = builder.to; + } + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static final class Builder { + private ResourceGroupLimit requests; + private ResourceGroupLimit limits; + private List from; + private List to; + + private Builder() { + } + + public Builder withRequests(@NonNull ResourceGroupLimit requests) { + this.requests = requests; + return this; + } + + public Builder withLimits(@NonNull ResourceGroupLimit limits) { + this.limits = limits; + return this; + } + + /** + * Set the transfer from list. + * + * @param from + * @return + */ + public Builder withFrom(@NonNull List from) { + this.from = from; + return this; + } + + /** + * Set the transfer to list. + * + * @param to + * @return + */ + public Builder withTo(@NonNull List to) { + this.to = to; + return this; + } + + public ResourceGroupConfig build() { + return new ResourceGroupConfig(this); + } + } + + public ResourceGroupConfig(@NonNull io.milvus.grpc.ResourceGroupConfig grpcConfig) { + this.requests = new ResourceGroupLimit(grpcConfig.getRequests()); + this.limits = new ResourceGroupLimit(grpcConfig.getLimits()); + this.from = grpcConfig.getTransferFromList().stream() + .map(transfer -> new ResourceGroupTransfer(transfer)) + .collect(Collectors.toList()); + this.to = grpcConfig.getTransferToList().stream() + .map(transfer -> new ResourceGroupTransfer(transfer)) + .collect(Collectors.toList()); + } + + public @NonNull io.milvus.grpc.ResourceGroupConfig toGRPC() { + io.milvus.grpc.ResourceGroupConfig.Builder builder = io.milvus.grpc.ResourceGroupConfig.newBuilder() + .setRequests(io.milvus.grpc.ResourceGroupLimit.newBuilder().setNodeNum(requests.getNodeNum())) + .setLimits(io.milvus.grpc.ResourceGroupLimit.newBuilder().setNodeNum(limits.getNodeNum())); + for (ResourceGroupTransfer transfer : from) { + builder.addTransferFrom(transfer.toGRPC()); + } + for (ResourceGroupTransfer transfer : to) { + builder.addTransferTo(transfer.toGRPC()); + } + return builder.build(); + } +} diff --git a/src/main/java/io/milvus/common/resourcegroup/ResourceGroupLimit.java b/src/main/java/io/milvus/common/resourcegroup/ResourceGroupLimit.java new file mode 100644 index 000000000..51890e8de --- /dev/null +++ b/src/main/java/io/milvus/common/resourcegroup/ResourceGroupLimit.java @@ -0,0 +1,38 @@ +package io.milvus.common.resourcegroup; + +import org.jetbrains.annotations.NotNull; + +import lombok.Getter; +import lombok.NonNull; + +@Getter +public class ResourceGroupLimit { + private Integer nodeNum; + + /** + * Constructor with node number. + * + * @param nodeNum + */ + public ResourceGroupLimit(@NonNull Integer nodeNum) { + this.nodeNum = nodeNum; + } + + /** + * Constructor from grpc + * + * @param grpcLimit + */ + public ResourceGroupLimit(@NonNull io.milvus.grpc.ResourceGroupLimit grpcLimit) { + this.nodeNum = grpcLimit.getNodeNum(); + } + + /** + * Transfer to grpc + * + * @return io.milvus.grpc.ResourceGroupLimit + */ + public @NonNull io.milvus.grpc.ResourceGroupLimit toGRPC() { + return io.milvus.grpc.ResourceGroupLimit.newBuilder().setNodeNum(nodeNum).build(); + } +} diff --git a/src/main/java/io/milvus/common/resourcegroup/ResourceGroupTransfer.java b/src/main/java/io/milvus/common/resourcegroup/ResourceGroupTransfer.java new file mode 100644 index 000000000..23a9ce746 --- /dev/null +++ b/src/main/java/io/milvus/common/resourcegroup/ResourceGroupTransfer.java @@ -0,0 +1,38 @@ +package io.milvus.common.resourcegroup; + +import lombok.Getter; +import lombok.NonNull; + +@Getter +public class ResourceGroupTransfer { + private String resourceGroupName; + + /** + * Constructor with resource group name. + * + * @param resourceGroupName + */ + public ResourceGroupTransfer(@NonNull String resourceGroupName) { + this.resourceGroupName = resourceGroupName; + } + + /** + * Constructor from grpc + * + * @param grpcTransfer + */ + public ResourceGroupTransfer(@NonNull io.milvus.grpc.ResourceGroupTransfer grpcTransfer) { + this.resourceGroupName = grpcTransfer.getResourceGroup(); + } + + /** + * Transfer to grpc + * + * @return io.milvus.grpc.ResourceGroupTransfer + */ + public @NonNull io.milvus.grpc.ResourceGroupTransfer toGRPC() { + return io.milvus.grpc.ResourceGroupTransfer.newBuilder() + .setResourceGroup(resourceGroupName) + .build(); + } +} diff --git a/src/main/java/io/milvus/param/control/GetReplicasParam.java b/src/main/java/io/milvus/param/control/GetReplicasParam.java index 7c5118f2a..0e278aeba 100644 --- a/src/main/java/io/milvus/param/control/GetReplicasParam.java +++ b/src/main/java/io/milvus/param/control/GetReplicasParam.java @@ -56,12 +56,12 @@ private Builder() { } /** - * Sets the database name. database name can be nil. - * - * @param databaseName database name - * @return Builder + * Sets the database name. Database name cannot be empty or null. + * + * @param databaseName + * @return */ - public Builder withDatabaseName(String databaseName) { + public Builder withDatabaseName(@NonNull String databaseName) { this.databaseName = databaseName; return this; } diff --git a/src/main/java/io/milvus/param/resourcegroup/CreateResourceGroupParam.java b/src/main/java/io/milvus/param/resourcegroup/CreateResourceGroupParam.java index 4f7fab447..44a0454fc 100644 --- a/src/main/java/io/milvus/param/resourcegroup/CreateResourceGroupParam.java +++ b/src/main/java/io/milvus/param/resourcegroup/CreateResourceGroupParam.java @@ -20,6 +20,7 @@ package io.milvus.param.resourcegroup; +import io.milvus.common.resourcegroup.ResourceGroupConfig; import io.milvus.exception.ParamException; import io.milvus.param.ParamUtils; import lombok.Getter; @@ -30,9 +31,11 @@ @ToString public class CreateResourceGroupParam { private final String groupName; + private final ResourceGroupConfig config; private CreateResourceGroupParam(@NonNull Builder builder) { this.groupName = builder.groupName; + this.config = builder.config; } public static Builder newBuilder() { @@ -44,6 +47,7 @@ public static Builder newBuilder() { */ public static final class Builder { private String groupName; + private ResourceGroupConfig config; private Builder() { } @@ -59,6 +63,17 @@ public Builder withGroupName(@NonNull String groupName) { return this; } + /** + * Sets the resource group config. + * + * @param config configuration of resource group + * @return Builder + */ + public Builder withConfig(@NonNull ResourceGroupConfig config) { + this.config = config; + return this; + } + /** * Verifies parameters and creates a new {@link CreateResourceGroupParam} instance. * diff --git a/src/main/java/io/milvus/param/resourcegroup/UpdateResourceGroupsParam.java b/src/main/java/io/milvus/param/resourcegroup/UpdateResourceGroupsParam.java new file mode 100644 index 000000000..854f2e206 --- /dev/null +++ b/src/main/java/io/milvus/param/resourcegroup/UpdateResourceGroupsParam.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.milvus.param.resourcegroup; + +import io.milvus.common.resourcegroup.ResourceGroupConfig; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; + +import lombok.Getter; +import lombok.NonNull; + +@Getter +public class UpdateResourceGroupsParam { + private final Map resourceGroups; + + private UpdateResourceGroupsParam(Builder builder) { + if (null == builder.resourceGroups || builder.resourceGroups.isEmpty()) { + throw new IllegalArgumentException("resourceGroups cannot be empty"); + } + this.resourceGroups = builder.resourceGroups; + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder for {@link UpdateResourceGroupsParam} class. + * + */ + public static final class Builder { + private Map resourceGroups; + + private Builder() { + } + + public Builder putResourceGroup(@NonNull String resourceGroupName, @NonNull ResourceGroupConfig resourceGroup) { + if (null == this.resourceGroups) { + this.resourceGroups = new HashMap<>(); + } + this.resourceGroups.put(resourceGroupName, resourceGroup); + return this; + } + + /** + * Builds the UpdateResourceGroupsParam object. + * + * @return {@link UpdateResourceGroupsParam} + */ + public UpdateResourceGroupsParam build() { + return new UpdateResourceGroupsParam(this); + } + } + + /** + * Converts to grpc request. + * + * @return io.milvus.grpc.UpdateResourceGroupsRequest + */ + public @NonNull io.milvus.grpc.UpdateResourceGroupsRequest toGRPC() { + io.milvus.grpc.UpdateResourceGroupsRequest.Builder builder = io.milvus.grpc.UpdateResourceGroupsRequest + .newBuilder(); + resourceGroups.forEach((k, v) -> { + builder.putResourceGroups(k, v.toGRPC()); + }); + return builder.build(); + } + + /** + * Constructs a String by {@link UpdateResourceGroupsParam} + * instance. + * + * @return String + */ + @Override + public String toString() { + return String.format("UpdateResourceGroupsRequest{resourceGroupNames:%s}", + resourceGroups.keySet().stream().collect(Collectors.joining(","))); + } +} diff --git a/src/main/milvus-proto b/src/main/milvus-proto index 55a0bceef..6c95f3065 160000 --- a/src/main/milvus-proto +++ b/src/main/milvus-proto @@ -1 +1 @@ -Subproject commit 55a0bceef3e286134ee67472f0fdb6e4baf84a52 +Subproject commit 6c95f3065923d215fc2aaa5e701cd6343c9373d7