Skip to content

Commit

Permalink
[eclipse-hono#2029] Fix issues caused by different vertx contexts bei…
Browse files Browse the repository at this point in the history
…ng used.

The CommandRouterServiceImpl is no Verticle anymore, therefore
there is no issue anymore with CommandRouterAmqpServer requests
ending up being handled on the wrong vert.x context.

Signed-off-by: Carsten Lohmann <[email protected]>
  • Loading branch information
calohmn committed Nov 26, 2020
1 parent 13d87d1 commit 275332b
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import java.util.Objects;

import org.eclipse.hono.commandrouter.impl.CommandRouterServiceImpl;
import org.eclipse.hono.commandrouter.infinispan.EmbeddedCacheConfig;
import org.eclipse.hono.commandrouter.infinispan.RemoteCacheConfig;
import org.eclipse.hono.service.AbstractApplication;
Expand All @@ -28,7 +27,6 @@
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Import;

import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Verticle;
Expand All @@ -45,21 +43,8 @@
@EnableAutoConfiguration
public class Application extends AbstractApplication {

private CommandRouterServiceImpl serviceImplementation;
private AuthenticationService authService;

/**
* Sets the Command Router service implementation.
*
* @param service The service implementation.
* @throws NullPointerException if service is {@code null}.
*/
@Autowired
public void setServiceImplementation(final CommandRouterServiceImpl service) {
this.serviceImplementation = Objects.requireNonNull(service);
log.info("using service implementation [{}]", service.getClass().getName());
}

/**
* Sets the service to use for authenticating clients.
*
Expand All @@ -82,7 +67,7 @@ public void setAuthenticationService(final AuthenticationService service) {
@Override
protected Future<?> deployRequiredVerticles(final int maxInstances) {

return CompositeFuture.all(deployVerticle(serviceImplementation), deployVerticle(authService));
return deployVerticle(authService);
}

private Future<String> deployVerticle(final Object component) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.eclipse.hono.deviceconnection.infinispan.client.BasicCache;
import org.eclipse.hono.deviceconnection.infinispan.client.CacheBasedDeviceConnectionInfo;
import org.eclipse.hono.deviceconnection.infinispan.client.CommonCacheConfig;
import org.eclipse.hono.service.HealthCheckProvider;
import org.eclipse.hono.service.HealthCheckServer;
import org.eclipse.hono.service.VertxBasedHealthCheckServer;
import org.eclipse.hono.service.amqp.AmqpEndpoint;
Expand Down Expand Up @@ -63,6 +64,7 @@
import io.opentracing.noop.NoopTracerFactory;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.ext.healthchecks.HealthCheckHandler;

/**
* Spring Boot configuration for the Command Router service.
Expand Down Expand Up @@ -172,13 +174,28 @@ public ObjectFactoryCreatingFactoryBean amqpServerFactory() {
/**
* Creates a new instance of an AMQP 1.0 protocol handler for Hono's <em>Command Router</em> API.
*
* @param service The service instance to delegate to.
* @return The handler.
*/
@Bean
@Scope("prototype")
public AmqpEndpoint commandRouterAmqpEndpoint(final CommandRouterService service) {
return new DelegatingCommandRouterAmqpEndpoint<>(vertx(), service);
public AmqpEndpoint commandRouterAmqpEndpoint() {
final CommandRouterService commandRouterService = commandRouterService();
return new DelegatingCommandRouterAmqpEndpoint<>(vertx(), commandRouterService) {

@Override
public void registerLivenessChecks(final HealthCheckHandler handler) {
if (commandRouterService instanceof HealthCheckProvider) {
((HealthCheckProvider) commandRouterService).registerLivenessChecks(handler);
}
}

@Override
public void registerReadinessChecks(final HealthCheckHandler handler) {
if (commandRouterService instanceof HealthCheckProvider) {
((HealthCheckProvider) commandRouterService).registerReadinessChecks(handler);
}
}
};
}

/**
Expand All @@ -198,7 +215,8 @@ public CommandRouterServiceConfigProperties commandRouterServiceConfigProperties
* @return The service implementation.
*/
@Bean
public CommandRouterServiceImpl commandRouterService() {
@Scope("prototype")
public CommandRouterService commandRouterService() {
return new CommandRouterServiceImpl();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.eclipse.hono.service.commandrouter.CommandRouterResult;
import org.eclipse.hono.service.commandrouter.CommandRouterService;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.ConfigurationSupportingVerticle;
import org.eclipse.hono.util.Lifecycle;
import org.eclipse.hono.util.RegistrationAssertion;
import org.eclipse.hono.util.RegistrationConstants;
Expand All @@ -48,7 +47,6 @@
import io.vertx.core.CompositeFuture;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.healthchecks.HealthCheckHandler;
Expand All @@ -57,8 +55,7 @@
/**
* An implementation of Hono's <em>Command Router</em> API.
*/
public class CommandRouterServiceImpl extends ConfigurationSupportingVerticle<CommandRouterServiceConfigProperties>
implements CommandRouterService, HealthCheckProvider {
public class CommandRouterServiceImpl implements CommandRouterService, HealthCheckProvider, Lifecycle {

private static final Logger LOG = LoggerFactory.getLogger(CommandRouterServiceImpl.class);

Expand All @@ -67,11 +64,16 @@ public class CommandRouterServiceImpl extends ConfigurationSupportingVerticle<Co
private CommandConsumerFactory commandConsumerFactory;
private CommandTargetMapper commandTargetMapper;
private Tracer tracer = NoopTracerFactory.create();
/**
* Vert.x context that this service has been started in.
*/
private Context context;

private CommandRouterServiceConfigProperties config;

@Autowired
@Override
public void setConfig(final CommandRouterServiceConfigProperties configuration) {
setSpecificConfig(configuration);
this.config = configuration;
}

/**
Expand Down Expand Up @@ -136,62 +138,64 @@ public final void setCommandTargetMapper(final CommandTargetMapper commandTarget
}

@Override
public void start(final Promise<Void> startPromise) throws Exception {

public Future<Void> start() {
context = Vertx.currentContext();
if (context == null) {
return Future.failedFuture(new IllegalStateException("Service must be started in a Vert.x context"));
}
if (registrationClient == null) {
startPromise.fail(new IllegalStateException("Device Registration client must be set"));
return Future.failedFuture(new IllegalStateException("Device Registration client must be set"));
} else if (deviceConnectionInfo == null) {
startPromise.fail(new IllegalStateException("Device Connection info client must be set"));
} else {
startServiceClient(registrationClient, "Device Registration service");
if (deviceConnectionInfo instanceof Lifecycle) {
startServiceClient((Lifecycle) deviceConnectionInfo, "Device Connection info");
}
startServiceClient(commandConsumerFactory, "Command & Control consumer factory");

// initialize components dependent on the above clientFactories
commandTargetMapper.initialize(new CommandTargetMapper.CommandTargetMapperContext() {

@Override
public Future<List<String>> getViaGateways(
final String tenant,
final String deviceId,
final SpanContext context) {

Objects.requireNonNull(tenant);
Objects.requireNonNull(deviceId);

return registrationClient.assertRegistration(tenant, deviceId, null, context)
.map(RegistrationAssertion::getAuthorizedGateways);
}

@Override
public Future<JsonObject> getCommandHandlingAdapterInstances(
final String tenant,
final String deviceId,
final List<String> viaGateways,
final SpanContext context) {

Objects.requireNonNull(tenant);
Objects.requireNonNull(deviceId);
Objects.requireNonNull(viaGateways);

final Span span = TracingHelper.buildChildSpan(tracer, context, "getCommandHandlingAdapterInstances",
getClass().getSimpleName())
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT)
.start();
return deviceConnectionInfo
.getCommandHandlingAdapterInstances(tenant, deviceId, new HashSet<>(viaGateways), span);
}
});
commandConsumerFactory.initialize(commandTargetMapper);

startPromise.complete();
return Future.failedFuture(new IllegalStateException("Device Connection info client must be set"));
}

startServiceClient(registrationClient, "Device Registration service");
if (deviceConnectionInfo instanceof Lifecycle) {
startServiceClient((Lifecycle) deviceConnectionInfo, "Device Connection info");
}
startServiceClient(commandConsumerFactory, "Command & Control consumer factory");

// initialize components dependent on the above clientFactories
commandTargetMapper.initialize(new CommandTargetMapper.CommandTargetMapperContext() {

@Override
public Future<List<String>> getViaGateways(
final String tenant,
final String deviceId,
final SpanContext context) {

Objects.requireNonNull(tenant);
Objects.requireNonNull(deviceId);

return registrationClient.assertRegistration(tenant, deviceId, null, context)
.map(RegistrationAssertion::getAuthorizedGateways);
}

@Override
public Future<JsonObject> getCommandHandlingAdapterInstances(
final String tenant,
final String deviceId,
final List<String> viaGateways,
final SpanContext context) {

Objects.requireNonNull(tenant);
Objects.requireNonNull(deviceId);
Objects.requireNonNull(viaGateways);

final Span span = TracingHelper.buildChildSpan(tracer, context, "getCommandHandlingAdapterInstances",
getClass().getSimpleName())
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT)
.start();
return deviceConnectionInfo
.getCommandHandlingAdapterInstances(tenant, deviceId, new HashSet<>(viaGateways), span);
}
});
commandConsumerFactory.initialize(commandTargetMapper);
return Future.succeededFuture();
}

@Override
public void stop(final Promise<Void> stopPromise) throws Exception {
public Future<Void> stop() {
LOG.info("stopping command router");

@SuppressWarnings("rawtypes")
Expand All @@ -202,16 +206,15 @@ public void stop(final Promise<Void> stopPromise) throws Exception {
}
results.add(stopServiceClient(commandConsumerFactory));

CompositeFuture.all(results)
return CompositeFuture.all(results)
.recover(t -> {
LOG.info("error while stopping command router", t);
return Future.failedFuture(t);
})
.map(ok -> {
LOG.info("successfully stopped command router");
return (Void) null;
})
.onComplete(stopPromise);
});
}

/**
Expand Down Expand Up @@ -327,9 +330,9 @@ public void registerLivenessChecks(final HealthCheckHandler handler) {
}

/**
* Registers a health check which tries to run an action on the protocol adapter context.
* Registers a health check which tries to run an action on the context that this service was started in.
* <p>
* If the protocol adapter vert.x event loop is blocked, the health check procedure will not complete
* If the vert.x event loop of that context is blocked, the health check procedure will not complete
* with OK status within the defined timeout.
*
* @param handler The health check handler to register the checks with.
Expand All @@ -338,7 +341,7 @@ protected void registerEventLoopBlockedCheck(final HealthCheckHandler handler) {

handler.register(
"event-loop-blocked-check",
getConfig().getEventLoopBlockedCheckTimeout(),
config.getEventLoopBlockedCheckTimeout(),
procedure -> {
final Context currentContext = Vertx.currentContext();

Expand Down

0 comments on commit 275332b

Please sign in to comment.