Skip to content

Commit

Permalink
Experiments with health checks and gRPC.
Browse files Browse the repository at this point in the history
Signed-off-by: Santiago Pericas-Geertsen <[email protected]>
  • Loading branch information
spericas committed Sep 4, 2024
1 parent b3300da commit 0486838
Show file tree
Hide file tree
Showing 5 changed files with 348 additions and 0 deletions.
13 changes: 13 additions & 0 deletions webserver/grpc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-services</artifactId>
</dependency>
<dependency>
<groupId>io.helidon.common.features</groupId>
<artifactId>helidon-common-features-api</artifactId>
Expand All @@ -72,6 +76,14 @@
<groupId>io.helidon.builder</groupId>
<artifactId>helidon-builder-api</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.microprofile.health</groupId>
<artifactId>microprofile-health-api</artifactId>
</dependency>
<dependency>
<groupId>io.helidon.microprofile.health</groupId>
<artifactId>helidon-microprofile-health</artifactId>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
Expand All @@ -86,6 +98,7 @@
<groupId>io.helidon.codegen</groupId>
<artifactId>helidon-codegen</artifactId>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (c) 2019, 2024 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.helidon.webserver.grpc;

import org.eclipse.microprofile.health.HealthCheck;
import org.eclipse.microprofile.health.HealthCheckResponse;

/**
* A simple {@link org.eclipse.microprofile.health.HealthCheck} implementation
* that always returns the same response.
*/
public class ConstantHealthCheck implements HealthCheck {

private final HealthCheckResponse response;

private ConstantHealthCheck(HealthCheckResponse response) {
this.response = response;
}

/**
* Obtain a {@link org.eclipse.microprofile.health.HealthCheck} that always returns a status of up.
*
* @param name the service name
* @return a {@link org.eclipse.microprofile.health.HealthCheck} that always returns a status of up
*/
public static HealthCheck up(String name) {
return new ConstantHealthCheck(HealthCheckResponse.named(name).up().build());
}

/**
* Obtain a {@link org.eclipse.microprofile.health.HealthCheck} that always returns a status of down.
*
* @param name the service name
* @return a {@link org.eclipse.microprofile.health.HealthCheck} that always returns a status of down
*/
public static HealthCheck down(String name) {
return new ConstantHealthCheck(HealthCheckResponse.named(name).down().build());
}

@Override
public HealthCheckResponse call() {
return response;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright (c) 2019, 2024 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.helidon.webserver.grpc;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import io.grpc.Status;
import io.grpc.health.v1.HealthCheckRequest;
import io.grpc.health.v1.HealthCheckResponse;
import io.grpc.health.v1.HealthGrpc;
import io.grpc.services.HealthStatusManager;
import io.grpc.stub.StreamObserver;
import org.eclipse.microprofile.health.HealthCheck;

/**
* An implementation of the {@link HealthGrpc} service.
*/
class HealthServiceImpl extends HealthGrpc.HealthImplBase {

/**
* A map of {@link HealthCheck}s keyed by service name.
*/
private final Map<String, HealthCheck> mapHealthChecks = new ConcurrentHashMap<>();

private HealthServiceImpl() {
// register the empty service name to represent the global health check
// see: https://github.com/grpc/grpc/blob/master/doc/health-checking.md
mapHealthChecks.put(HealthStatusManager.SERVICE_NAME_ALL_SERVICES,
ConstantHealthCheck.up(HealthStatusManager.SERVICE_NAME_ALL_SERVICES));
}

/**
* Create a {@link io.helidon.webserver.grpc.HealthServiceImpl}.
*/
static HealthServiceImpl create() {
return new HealthServiceImpl();
}

/**
* Add a {@link HealthCheck}.
*
* @param name the name of the service
* @param healthCheck the {@link HealthCheck} implementation
*/
void add(String name, HealthCheck healthCheck) {
mapHealthChecks.put(name, healthCheck);
}

/**
* Obtain the collection of registered {@link HealthCheck}s.
*
* @return the collection of registered {@link HealthCheck}s
*/
Collection<HealthCheck> healthChecks() {
return mapHealthChecks.values();
}

@Override
public void check(HealthCheckRequest request, StreamObserver<HealthCheckResponse> responseObserver) {
String service = request.getService();
HealthCheck check = mapHealthChecks.get(service);

if (check == null) {
// If no health check is registered for the requested service then respond with a not found error
String message = "Service '" + service + "' does not exist or does not have a registered health check";
responseObserver.onError(Status.NOT_FOUND.withDescription(message).asException());
} else {
responseObserver.onNext(toHealthCheckResponse(check.call()));
responseObserver.onCompleted();
}
}

private HealthCheckResponse toHealthCheckResponse(HealthCheckResponse.ServingStatus status) {
return HealthCheckResponse.newBuilder().setStatus(status).build();
}

private HealthCheckResponse toHealthCheckResponse(org.eclipse.microprofile.health.HealthCheckResponse response) {
return response.getStatus().equals(org.eclipse.microprofile.health.HealthCheckResponse.Status.UP)
? toHealthCheckResponse(HealthCheckResponse.ServingStatus.SERVING)
: toHealthCheckResponse(HealthCheckResponse.ServingStatus.NOT_SERVING);
}
}
2 changes: 2 additions & 0 deletions webserver/grpc/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@

requires io.grpc;
requires io.grpc.stub;
requires io.grpc.services;
requires com.google.protobuf;
requires microprofile.health.api;

requires transitive io.helidon.grpc.core;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* Copyright (c) 2019, 2024 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.helidon.webserver.grpc;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;

import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
import io.grpc.health.v1.HealthCheckRequest;
import io.grpc.health.v1.HealthCheckResponse;
import io.grpc.stub.StreamObserver;
import org.eclipse.microprofile.health.HealthCheck;
import org.junit.jupiter.api.Test;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;

public class HealthServiceImplTest {

@Test
public void shouldRequestCheckForUpService() {
HealthServiceImpl healthService = HealthServiceImpl.create();
String serviceName = "foo";
HealthCheck check = ConstantHealthCheck.up(serviceName);
HealthCheckRequest request = HealthCheckRequest.newBuilder().setService(serviceName).build();
TestStreamObserver<HealthCheckResponse> observer = new TestStreamObserver<>();
healthService.add(serviceName, check);
healthService.check(request, observer);
observer.assertComplete().assertValueCount(1);
List<HealthCheckResponse> responses = observer.values();
assertThat(responses.size(), is(1));
HealthCheckResponse response = responses.getFirst();
assertThat(response.getStatus(), is(HealthCheckResponse.ServingStatus.SERVING));
}

@Test
public void shouldRequestCheckForDownService() {
HealthServiceImpl healthService = HealthServiceImpl.create();
String serviceName = "foo";
HealthCheck check = ConstantHealthCheck.down(serviceName);
HealthCheckRequest request = HealthCheckRequest.newBuilder().setService(serviceName).build();
TestStreamObserver<HealthCheckResponse> observer = new TestStreamObserver<>();
healthService.add(serviceName, check);
healthService.check(request, observer);
observer.assertComplete().assertValueCount(1);
List<HealthCheckResponse> responses = observer.values();
assertThat(responses.size(), is(1));
HealthCheckResponse response = responses.getFirst();
assertThat(response.getStatus(), is(HealthCheckResponse.ServingStatus.NOT_SERVING));
}

@Test
public void shouldRequestCheckForGlobalService() {
HealthServiceImpl healthService = HealthServiceImpl.create();
String serviceName = "";
HealthCheck check = ConstantHealthCheck.up(serviceName);
HealthCheckRequest request = HealthCheckRequest.newBuilder().setService(serviceName).build();
TestStreamObserver<HealthCheckResponse> observer = new TestStreamObserver<>();
healthService.add(serviceName, check);
healthService.check(request, observer);
observer.assertComplete().assertValueCount(1);
List<HealthCheckResponse> responses = observer.values();
assertThat(responses.size(), is(1));
HealthCheckResponse response = responses.getFirst();
assertThat(response.getStatus(), is(HealthCheckResponse.ServingStatus.SERVING));
}

@Test
public void shouldRequestCheckWithoutServiceName() {
HealthServiceImpl healthService = HealthServiceImpl.create();
String serviceName = "";
HealthCheck check = ConstantHealthCheck.up(serviceName);
HealthCheckRequest request = HealthCheckRequest.newBuilder().build();
TestStreamObserver<HealthCheckResponse> observer = new TestStreamObserver<>();
healthService.add(serviceName, check);
healthService.check(request, observer);
observer.assertComplete().assertValueCount(1);
List<HealthCheckResponse> responses = observer.values();
assertThat(responses.size(), is(1));
HealthCheckResponse response = responses.getFirst();
assertThat(response.getStatus(), is(HealthCheckResponse.ServingStatus.SERVING));
}

@Test
public void shouldRequestCheckForUnknownService() {
HealthServiceImpl healthService = HealthServiceImpl.create();
String serviceName = "unknown";
HealthCheckRequest request = HealthCheckRequest.newBuilder().setService(serviceName).build();
TestStreamObserver<HealthCheckResponse> observer = new TestStreamObserver<>();
healthService.check(request, observer);
observer.assertError(this::isNotFoundError);
}

private boolean isNotFoundError(Throwable thrown) {
if (thrown instanceof StatusException) {
return ((StatusException) thrown).getStatus().getCode().equals(Status.NOT_FOUND.getCode());
} else if (thrown instanceof StatusRuntimeException) {
return ((StatusRuntimeException) thrown).getStatus().getCode().equals(Status.NOT_FOUND.getCode());
} else {
return false;
}
}

static class TestStreamObserver<T> implements StreamObserver<T> {

private final List<T> values = new ArrayList<>();
private final CompletableFuture<Boolean> future = new CompletableFuture<>();

@Override
public void onNext(T t) {
values.add(t);
}

@Override
public void onError(Throwable throwable) {
values.clear();
future.completeExceptionally(throwable);
}

@Override
public void onCompleted() {
future.complete(true);
}

List<T> values() {
return values;
}

void assertError(Predicate<Throwable> consumer) {
if (future.isCompletedExceptionally()) {
try {
future.get();
} catch (Exception e) {
assertThat(consumer.test(e.getCause()), is(true));
}
}
}

TestStreamObserver<T> assertComplete() {
assertThat(awaitTerminalEvent(), is(true));
return this;
}

TestStreamObserver<T> assertValueCount(int count) {
assertThat(values.size(), is(count));
return this;
}

private boolean awaitTerminalEvent() {
try {
future.get(10, TimeUnit.SECONDS);
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}

0 comments on commit 0486838

Please sign in to comment.