diff --git a/src/main/java/com/ge/predix/eventhub/client/Client.java b/src/main/java/com/ge/predix/eventhub/client/Client.java index d724394..0c91e18 100644 --- a/src/main/java/com/ge/predix/eventhub/client/Client.java +++ b/src/main/java/com/ge/predix/eventhub/client/Client.java @@ -47,6 +47,7 @@ public class Client implements AutoCloseable { private Timer healthCheckTimer; private Thread healthThread; private CountDownLatch timerStart; + private final StreamObserver healthCheckObserver; protected EventHubLogger ehLogger; protected PublishClient publishClient; @@ -110,26 +111,34 @@ default void onMessage(Object o){ /** - * Creates a new client from the given configuration + * Creates a new client with default health check observer from the given configuration * * @param configuration Configuration that has Event Hub details and preferences */ public Client(EventHubConfiguration configuration) { + this(configuration, null); + } + + /** + * Creates a new client from the given configuration with specified health check observer + * + * @param configuration Configuration that has Event Hub details and preferences + * @param healthCheckObserver health check observer implementation to use. If {@code null}, uses default implementation + */ + public Client(EventHubConfiguration configuration, StreamObserver healthCheckObserver) { this.ehLogger = new EventHubLogger(this.getClass(), configuration); this.configuration = configuration; ehLogger.log(Level.INFO, "starting EventHub client"); - + this.healthCheckObserver = healthCheckObserver != null ? healthCheckObserver : new DefaultHealthCheckObserver() ; JSONObject mandatoryClientLog = new JSONObject(); mandatoryClientLog.put("sdk_version", sdkVersionString); mandatoryClientLog.put("zoneID", this.configuration.getZoneID()); mandatoryClientLog.put("runtimeID", this.configuration.getLoggerConfiguration().getRuntimeId()); System.out.println(mandatoryClientLog); - buildChannel(); startHealthChecker(); initSubscribeClient(); initPublishClient(); - } /** @@ -187,29 +196,6 @@ private void startHealthChecker() { MSG_KEY, "starting health checker"); final HealthGrpc.HealthStub healthStub = HealthGrpc.newStub(originChannel); final HealthCheckRequest request = HealthCheckRequest.newBuilder().setService(healthCheckUrl).build(); - final StreamObserver observer = new StreamObserver() { - public void onNext(HealthCheckResponse healthCheckResponse) { - ehLogger.log( Level.FINEST, - CLIENT_CHANNEL_MSG, - MSG_KEY, "received health check response" + healthCheckResponse); - } - - public void onError(Throwable throwable) { - // Extract the Error Status from the cause throwable via - // io.grpc.Status.fromThrowable(throwable) , this helps for debugging - ehLogger.log( Level.WARNING, - CLIENT_CHANNEL_ERROR, - MSG_KEY, "error in health check", - EXCEPTION_KEY,io.grpc.Status.fromThrowable(throwable) - ); - } - - public void onCompleted() { - ehLogger.log( Level.FINE, - CLIENT_CHANNEL_MSG, - MSG_KEY, "health check channel complete"); - } - }; final TimerTask checkHealth = new TimerTask() { @Override @@ -218,7 +204,7 @@ public void run() { CLIENT_CHANNEL_MSG, MSG_KEY, "pinging event hub for health check"); try { - healthStub.check(request, observer); + healthStub.check(request, healthCheckObserver); } catch(Exception e){ ehLogger.log( Level.SEVERE, @@ -688,4 +674,31 @@ public String toString(){ "configuration", configuration.toString() ).toString(); } + + /** + * Default health check observer implementation + */ + protected class DefaultHealthCheckObserver implements StreamObserver { + public void onNext(HealthCheckResponse healthCheckResponse) { + ehLogger.log( Level.FINEST, + CLIENT_CHANNEL_MSG, + MSG_KEY, "received health check response" + healthCheckResponse); + } + + public void onError(Throwable throwable) { + // Extract the Error Status from the cause throwable via + // io.grpc.Status.fromThrowable(throwable) , this helps for debugging + ehLogger.log( Level.WARNING, + CLIENT_CHANNEL_ERROR, + MSG_KEY, "error in health check", + EXCEPTION_KEY, Status.fromThrowable(throwable) + ); + } + + public void onCompleted() { + ehLogger.log( Level.FINE, + CLIENT_CHANNEL_MSG, + MSG_KEY, "health check channel complete"); + } + } }