Skip to content

Commit

Permalink
Implement getReplicas() interface (milvus-io#289)
Browse files Browse the repository at this point in the history
Signed-off-by: groot <[email protected]>
  • Loading branch information
yhmo authored May 7, 2022
1 parent a064f16 commit 16d8310
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 2 deletions.
43 changes: 41 additions & 2 deletions src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1296,7 +1296,7 @@ public R<MutationResult> insert(@NonNull InsertParam requestParam) {
.withCollectionName(requestParam.getCollectionName())
.build());
if (descResp.getStatus() != R.Status.Success.getCode()) {
logInfo("Failed to describe collection: {}", requestParam.getCollectionName());
logError("Failed to describe collection: {}", requestParam.getCollectionName());
return R.failed(R.Status.valueOf(descResp.getStatus()), descResp.getMessage());
}

Expand Down Expand Up @@ -1718,6 +1718,45 @@ public R<GetQuerySegmentInfoResponse> getQuerySegmentInfo(@NonNull GetQuerySegme
}
}

@Override
public R<GetReplicasResponse> getReplicas(GetReplicasParam requestParam) {
if (!clientIsReady()) {
return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
}

logInfo(requestParam.toString());

try {
R<DescribeCollectionResponse> descResp = describeCollection(DescribeCollectionParam.newBuilder()
.withCollectionName(requestParam.getCollectionName())
.build());
if (descResp.getStatus() != R.Status.Success.getCode()) {
logError("Failed to describe collection: {}", requestParam.getCollectionName());
return R.failed(R.Status.valueOf(descResp.getStatus()), descResp.getMessage());
}

GetReplicasRequest getReplicasRequest = GetReplicasRequest.newBuilder()
.setCollectionID(descResp.getData().getCollectionID())
.setWithShardNodes(requestParam.isWithShardNodes())
.build();

GetReplicasResponse response = blockingStub().getReplicas(getReplicasRequest);

if (response.getStatus().getErrorCode() == ErrorCode.Success) {
logInfo("GetReplicasRequest successfully!");
return R.success(response);
} else {
return failedStatus("GetReplicasRequest", response.getStatus());
}
} catch (StatusRuntimeException e) {
logError("GetReplicasRequest RPC failed:\n{}", e.getStatus().toString());
return R.failed(e);
} catch (Exception e) {
logError("GetReplicasRequest failed:\n{}", e.getMessage());
return R.failed(e);
}
}

@Override
public R<RpcStatus> loadBalance(LoadBalanceParam requestParam) {
if (!clientIsReady()) {
Expand Down Expand Up @@ -1793,7 +1832,7 @@ public R<ManualCompactionResponse> manualCompaction(ManualCompactionParam reques
.withCollectionName(requestParam.getCollectionName())
.build());
if (descResp.getStatus() != R.Status.Success.getCode()) {
logInfo("ManualCompactionRequest successfully!");
logError("Failed to describe collection: {}", requestParam.getCollectionName());
return R.failed(R.Status.valueOf(descResp.getStatus()), descResp.getMessage());
}

Expand Down
8 changes: 8 additions & 0 deletions src/main/java/io/milvus/client/MilvusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,14 @@ default void close() {
*/
R<GetQuerySegmentInfoResponse> getQuerySegmentInfo(GetQuerySegmentInfoParam requestParam);

/**
* Returns the collection's replica information
*
* @param requestParam {@link GetReplicasParam}
* @return {status:result code, data:GetReplicasResponse{status,info}}
*/
R<GetReplicasResponse> getReplicas(GetReplicasParam requestParam);

/**
* Moves segment from a query node to another to keep the load balanced.
*
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/io/milvus/client/MilvusMultiServiceClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,11 @@ public R<GetQuerySegmentInfoResponse> getQuerySegmentInfo(GetQuerySegmentInfoPar
return this.clusterFactory.getMaster().getClient().getQuerySegmentInfo(requestParam);
}

@Override
public R<GetReplicasResponse> getReplicas(GetReplicasParam requestParam) {
return this.clusterFactory.getMaster().getClient().getReplicas(requestParam);
}

@Override
public R<RpcStatus> loadBalance(LoadBalanceParam requestParam) {
List<R<RpcStatus>> response = this.clusterFactory.getAvailableServerSettings().parallelStream()
Expand Down
68 changes: 68 additions & 0 deletions src/main/java/io/milvus/param/control/GetReplicasParam.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package io.milvus.param.control;

import io.milvus.exception.ParamException;
import io.milvus.param.ParamUtils;
import lombok.Getter;
import lombok.NonNull;

/**
* Parameters for <code>getReplicas</code> interface.
*/
@Getter
public class GetReplicasParam {
private final String collectionName;
private boolean withShardNodes;

private GetReplicasParam(@NonNull Builder builder) {
this.collectionName = builder.collectionName;
this.withShardNodes = true;
}

public static Builder newBuilder() {
return new Builder();
}

/**
* Builder for {@link GetReplicasParam} class.
*/
public static final class Builder {
private String collectionName;

private Builder() {
}

/**
* Sets the collection name. Collection name cannot be empty or null.
*
* @param collectionName collection name
* @return <code>Builder</code>
*/
public Builder withCollectionName(@NonNull String collectionName) {
this.collectionName = collectionName;
return this;
}

/**
* Verifies parameters and creates a new {@link GetReplicasParam} instance.
*
* @return {@link GetReplicasParam}
*/
public GetReplicasParam build() throws ParamException {
ParamUtils.CheckNullEmptyString(collectionName, "Collection name");

return new GetReplicasParam(this);
}
}

/**
* Constructs a <code>String</code> by {@link GetReplicasParam} instance.
*
* @return <code>String</code>
*/
@Override
public String toString() {
return "GetReplicasParam{" +
"collectionName='" + collectionName + '\'' +
'}';
}
}
19 changes: 19 additions & 0 deletions src/test/java/io/milvus/client/MilvusServiceClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2057,6 +2057,25 @@ void getQuerySegmentInfo() {
testFuncByName("getQuerySegmentInfo", param);
}

@Test
void getReplicasParam() {
// test throw exception with illegal input
assertThrows(ParamException.class, () -> GetQuerySegmentInfoParam
.newBuilder()
.withCollectionName("")
.build()
);
}

@Test
void getReplicas() {
GetReplicasParam param = GetReplicasParam.newBuilder()
.withCollectionName("collection1")
.build();

testFuncByName("getReplicas", param);
}

@Test
void loadBalanceParam() {
// test throw exception with illegal input
Expand Down
14 changes: 14 additions & 0 deletions src/test/java/io/milvus/server/MockMilvusServerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class MockMilvusServerImpl extends MilvusServiceGrpc.MilvusServiceImplBas
private io.milvus.grpc.GetFlushStateResponse respGetFlushState;
private io.milvus.grpc.GetPersistentSegmentInfoResponse respGetPersistentSegmentInfo;
private io.milvus.grpc.GetQuerySegmentInfoResponse respGetQuerySegmentInfo;
private io.milvus.grpc.GetReplicasResponse respGetReplicas;
private io.milvus.grpc.GetMetricsResponse respGetMetrics;

private io.milvus.grpc.Status respLoadBalance;
Expand Down Expand Up @@ -513,6 +514,19 @@ public void setGetQuerySegmentInfoResponse(io.milvus.grpc.GetQuerySegmentInfoRes
respGetQuerySegmentInfo = resp;
}

@Override
public void getReplicas(io.milvus.grpc.GetReplicasRequest request,
io.grpc.stub.StreamObserver<io.milvus.grpc.GetReplicasResponse> responseObserver) {
logger.info("getReplicas() call");

responseObserver.onNext(respGetReplicas);
responseObserver.onCompleted();
}

public void setGetReplicasResponse(io.milvus.grpc.GetReplicasResponse resp) {
respGetReplicas = resp;
}

@Override
public void getMetrics(io.milvus.grpc.GetMetricsRequest request,
io.grpc.stub.StreamObserver<io.milvus.grpc.GetMetricsResponse> responseObserver) {
Expand Down

0 comments on commit 16d8310

Please sign in to comment.