From 275332b20617c850fbc639a77ff06f5b6d6283c2 Mon Sep 17 00:00:00 2001 From: Carsten Lohmann Date: Mon, 23 Nov 2020 17:50:22 +0100 Subject: [PATCH] [#2029] Fix issues caused by different vertx contexts being used. The CommandRouterServiceImpl is no Verticle anymore, therefore there is no issue anymore with CommandRouterAmqpServer requests ending up being handled on the wrong vert.x context. Signed-off-by: Carsten Lohmann --- .../hono/commandrouter/Application.java | 17 +-- .../hono/commandrouter/ApplicationConfig.java | 26 +++- .../impl/CommandRouterServiceImpl.java | 127 +++++++++--------- 3 files changed, 88 insertions(+), 82 deletions(-) diff --git a/services/command-router/src/main/java/org/eclipse/hono/commandrouter/Application.java b/services/command-router/src/main/java/org/eclipse/hono/commandrouter/Application.java index 699cd7290b..b46c8c0328 100644 --- a/services/command-router/src/main/java/org/eclipse/hono/commandrouter/Application.java +++ b/services/command-router/src/main/java/org/eclipse/hono/commandrouter/Application.java @@ -15,7 +15,6 @@ import java.util.Objects; -import org.eclipse.hono.commandrouter.impl.CommandRouterServiceImpl; import org.eclipse.hono.commandrouter.infinispan.EmbeddedCacheConfig; import org.eclipse.hono.commandrouter.infinispan.RemoteCacheConfig; import org.eclipse.hono.service.AbstractApplication; @@ -28,7 +27,6 @@ import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Import; -import io.vertx.core.CompositeFuture; import io.vertx.core.Future; import io.vertx.core.Promise; import io.vertx.core.Verticle; @@ -45,21 +43,8 @@ @EnableAutoConfiguration public class Application extends AbstractApplication { - private CommandRouterServiceImpl serviceImplementation; private AuthenticationService authService; - /** - * Sets the Command Router service implementation. - * - * @param service The service implementation. - * @throws NullPointerException if service is {@code null}. - */ - @Autowired - public void setServiceImplementation(final CommandRouterServiceImpl service) { - this.serviceImplementation = Objects.requireNonNull(service); - log.info("using service implementation [{}]", service.getClass().getName()); - } - /** * Sets the service to use for authenticating clients. * @@ -82,7 +67,7 @@ public void setAuthenticationService(final AuthenticationService service) { @Override protected Future deployRequiredVerticles(final int maxInstances) { - return CompositeFuture.all(deployVerticle(serviceImplementation), deployVerticle(authService)); + return deployVerticle(authService); } private Future deployVerticle(final Object component) { diff --git a/services/command-router/src/main/java/org/eclipse/hono/commandrouter/ApplicationConfig.java b/services/command-router/src/main/java/org/eclipse/hono/commandrouter/ApplicationConfig.java index 139e83e595..7dcd4822af 100644 --- a/services/command-router/src/main/java/org/eclipse/hono/commandrouter/ApplicationConfig.java +++ b/services/command-router/src/main/java/org/eclipse/hono/commandrouter/ApplicationConfig.java @@ -33,6 +33,7 @@ import org.eclipse.hono.deviceconnection.infinispan.client.BasicCache; import org.eclipse.hono.deviceconnection.infinispan.client.CacheBasedDeviceConnectionInfo; import org.eclipse.hono.deviceconnection.infinispan.client.CommonCacheConfig; +import org.eclipse.hono.service.HealthCheckProvider; import org.eclipse.hono.service.HealthCheckServer; import org.eclipse.hono.service.VertxBasedHealthCheckServer; import org.eclipse.hono.service.amqp.AmqpEndpoint; @@ -63,6 +64,7 @@ import io.opentracing.noop.NoopTracerFactory; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; +import io.vertx.ext.healthchecks.HealthCheckHandler; /** * Spring Boot configuration for the Command Router service. @@ -172,13 +174,28 @@ public ObjectFactoryCreatingFactoryBean amqpServerFactory() { /** * Creates a new instance of an AMQP 1.0 protocol handler for Hono's Command Router API. * - * @param service The service instance to delegate to. * @return The handler. */ @Bean @Scope("prototype") - public AmqpEndpoint commandRouterAmqpEndpoint(final CommandRouterService service) { - return new DelegatingCommandRouterAmqpEndpoint<>(vertx(), service); + public AmqpEndpoint commandRouterAmqpEndpoint() { + final CommandRouterService commandRouterService = commandRouterService(); + return new DelegatingCommandRouterAmqpEndpoint<>(vertx(), commandRouterService) { + + @Override + public void registerLivenessChecks(final HealthCheckHandler handler) { + if (commandRouterService instanceof HealthCheckProvider) { + ((HealthCheckProvider) commandRouterService).registerLivenessChecks(handler); + } + } + + @Override + public void registerReadinessChecks(final HealthCheckHandler handler) { + if (commandRouterService instanceof HealthCheckProvider) { + ((HealthCheckProvider) commandRouterService).registerReadinessChecks(handler); + } + } + }; } /** @@ -198,7 +215,8 @@ public CommandRouterServiceConfigProperties commandRouterServiceConfigProperties * @return The service implementation. */ @Bean - public CommandRouterServiceImpl commandRouterService() { + @Scope("prototype") + public CommandRouterService commandRouterService() { return new CommandRouterServiceImpl(); } diff --git a/services/command-router/src/main/java/org/eclipse/hono/commandrouter/impl/CommandRouterServiceImpl.java b/services/command-router/src/main/java/org/eclipse/hono/commandrouter/impl/CommandRouterServiceImpl.java index 785a2904c7..290a5daa0c 100644 --- a/services/command-router/src/main/java/org/eclipse/hono/commandrouter/impl/CommandRouterServiceImpl.java +++ b/services/command-router/src/main/java/org/eclipse/hono/commandrouter/impl/CommandRouterServiceImpl.java @@ -31,7 +31,6 @@ import org.eclipse.hono.service.commandrouter.CommandRouterResult; import org.eclipse.hono.service.commandrouter.CommandRouterService; import org.eclipse.hono.tracing.TracingHelper; -import org.eclipse.hono.util.ConfigurationSupportingVerticle; import org.eclipse.hono.util.Lifecycle; import org.eclipse.hono.util.RegistrationAssertion; import org.eclipse.hono.util.RegistrationConstants; @@ -48,7 +47,6 @@ import io.vertx.core.CompositeFuture; import io.vertx.core.Context; import io.vertx.core.Future; -import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.json.JsonObject; import io.vertx.ext.healthchecks.HealthCheckHandler; @@ -57,8 +55,7 @@ /** * An implementation of Hono's Command Router API. */ -public class CommandRouterServiceImpl extends ConfigurationSupportingVerticle - implements CommandRouterService, HealthCheckProvider { +public class CommandRouterServiceImpl implements CommandRouterService, HealthCheckProvider, Lifecycle { private static final Logger LOG = LoggerFactory.getLogger(CommandRouterServiceImpl.class); @@ -67,11 +64,16 @@ public class CommandRouterServiceImpl extends ConfigurationSupportingVerticle startPromise) throws Exception { - + public Future start() { + context = Vertx.currentContext(); + if (context == null) { + return Future.failedFuture(new IllegalStateException("Service must be started in a Vert.x context")); + } if (registrationClient == null) { - startPromise.fail(new IllegalStateException("Device Registration client must be set")); + return Future.failedFuture(new IllegalStateException("Device Registration client must be set")); } else if (deviceConnectionInfo == null) { - startPromise.fail(new IllegalStateException("Device Connection info client must be set")); - } else { - startServiceClient(registrationClient, "Device Registration service"); - if (deviceConnectionInfo instanceof Lifecycle) { - startServiceClient((Lifecycle) deviceConnectionInfo, "Device Connection info"); - } - startServiceClient(commandConsumerFactory, "Command & Control consumer factory"); - - // initialize components dependent on the above clientFactories - commandTargetMapper.initialize(new CommandTargetMapper.CommandTargetMapperContext() { - - @Override - public Future> getViaGateways( - final String tenant, - final String deviceId, - final SpanContext context) { - - Objects.requireNonNull(tenant); - Objects.requireNonNull(deviceId); - - return registrationClient.assertRegistration(tenant, deviceId, null, context) - .map(RegistrationAssertion::getAuthorizedGateways); - } - - @Override - public Future getCommandHandlingAdapterInstances( - final String tenant, - final String deviceId, - final List viaGateways, - final SpanContext context) { - - Objects.requireNonNull(tenant); - Objects.requireNonNull(deviceId); - Objects.requireNonNull(viaGateways); - - final Span span = TracingHelper.buildChildSpan(tracer, context, "getCommandHandlingAdapterInstances", - getClass().getSimpleName()) - .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT) - .start(); - return deviceConnectionInfo - .getCommandHandlingAdapterInstances(tenant, deviceId, new HashSet<>(viaGateways), span); - } - }); - commandConsumerFactory.initialize(commandTargetMapper); - - startPromise.complete(); + return Future.failedFuture(new IllegalStateException("Device Connection info client must be set")); + } + + startServiceClient(registrationClient, "Device Registration service"); + if (deviceConnectionInfo instanceof Lifecycle) { + startServiceClient((Lifecycle) deviceConnectionInfo, "Device Connection info"); } + startServiceClient(commandConsumerFactory, "Command & Control consumer factory"); + + // initialize components dependent on the above clientFactories + commandTargetMapper.initialize(new CommandTargetMapper.CommandTargetMapperContext() { + + @Override + public Future> getViaGateways( + final String tenant, + final String deviceId, + final SpanContext context) { + + Objects.requireNonNull(tenant); + Objects.requireNonNull(deviceId); + + return registrationClient.assertRegistration(tenant, deviceId, null, context) + .map(RegistrationAssertion::getAuthorizedGateways); + } + + @Override + public Future getCommandHandlingAdapterInstances( + final String tenant, + final String deviceId, + final List viaGateways, + final SpanContext context) { + + Objects.requireNonNull(tenant); + Objects.requireNonNull(deviceId); + Objects.requireNonNull(viaGateways); + + final Span span = TracingHelper.buildChildSpan(tracer, context, "getCommandHandlingAdapterInstances", + getClass().getSimpleName()) + .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT) + .start(); + return deviceConnectionInfo + .getCommandHandlingAdapterInstances(tenant, deviceId, new HashSet<>(viaGateways), span); + } + }); + commandConsumerFactory.initialize(commandTargetMapper); + return Future.succeededFuture(); } @Override - public void stop(final Promise stopPromise) throws Exception { + public Future stop() { LOG.info("stopping command router"); @SuppressWarnings("rawtypes") @@ -202,7 +206,7 @@ public void stop(final Promise stopPromise) throws Exception { } results.add(stopServiceClient(commandConsumerFactory)); - CompositeFuture.all(results) + return CompositeFuture.all(results) .recover(t -> { LOG.info("error while stopping command router", t); return Future.failedFuture(t); @@ -210,8 +214,7 @@ public void stop(final Promise stopPromise) throws Exception { .map(ok -> { LOG.info("successfully stopped command router"); return (Void) null; - }) - .onComplete(stopPromise); + }); } /** @@ -327,9 +330,9 @@ public void registerLivenessChecks(final HealthCheckHandler handler) { } /** - * Registers a health check which tries to run an action on the protocol adapter context. + * Registers a health check which tries to run an action on the context that this service was started in. *

- * If the protocol adapter vert.x event loop is blocked, the health check procedure will not complete + * If the vert.x event loop of that context is blocked, the health check procedure will not complete * with OK status within the defined timeout. * * @param handler The health check handler to register the checks with. @@ -338,7 +341,7 @@ protected void registerEventLoopBlockedCheck(final HealthCheckHandler handler) { handler.register( "event-loop-blocked-check", - getConfig().getEventLoopBlockedCheckTimeout(), + config.getEventLoopBlockedCheckTimeout(), procedure -> { final Context currentContext = Vertx.currentContext();