From 4f31d38e46669bcb2253fd133c67811fd8411341 Mon Sep 17 00:00:00 2001 From: Carsten Lohmann Date: Fri, 20 Nov 2020 14:51:37 +0100 Subject: [PATCH] [#2029] Use Command Router in adapters and integration tests. This adds a org.eclipse.hono.adapter.client.command.CommandConsumerFactory implementation that uses the new Command Router component. For the integration tests, a new maven profile 'command-router' is added to let the tests run using the Command Router component. The GitHub action workflow has been adapted to use that profile in the test-run that uses the jdbc device registry (so that the other test-runs still use the old command routing mechanism). Signed-off-by: Carsten Lohmann --- .github/workflows/ci.yml | 9 +- .../amqp/ProtonBasedCommandContext.java | 101 ++++++ ...mmandRouterCommandConsumerFactoryImpl.java | 325 ++++++++++++++++++ ...asedDelegatingCommandConsumerFactory.java} | 125 +------ .../ProtonBasedDeviceConnectionClient.java | 2 +- .../src/main/deploy/example-permissions.json | 8 + .../src/main/sandbox/sandbox-permissions.json | 8 + jenkins/Hono-Deploy-Eclipse-Pipeline.groovy | 2 +- .../Hono-Deploy-Maven-Central-Pipeline.groovy | 2 +- jenkins/Hono-Nightly-Pipeline.groovy | 2 +- push_hono_images.sh | 3 +- .../AbstractProtocolAdapterApplication.java | 20 +- .../hono/service/AbstractAdapterConfig.java | 86 ++++- .../service/AbstractProtocolAdapterBase.java | 4 +- tests/pom.xml | 107 ++++++ tests/readme.md | 8 +- .../hono/tests/IntegrationTestSupport.java | 20 ++ .../registry/DeviceConnectionApiTests.java | 8 + tests/src/test/resources/amqp/application.yml | 6 +- .../src/test/resources/auth/permissions.json | 26 ++ tests/src/test/resources/coap/application.yml | 4 +- .../resources/commandrouter/application.yml | 43 +++ .../resources/commandrouter/cache-config.xml | 21 ++ .../commandrouter/logback-spring.xml | 61 ++++ tests/src/test/resources/http/application.yml | 4 +- tests/src/test/resources/mqtt/application.yml | 4 +- .../resources/qpid/qdrouterd-with-broker.json | 2 +- 27 files changed, 860 insertions(+), 151 deletions(-) create mode 100644 clients/adapter-amqp/src/main/java/org/eclipse/hono/adapter/client/command/amqp/ProtonBasedCommandContext.java create mode 100644 clients/adapter-amqp/src/main/java/org/eclipse/hono/adapter/client/command/amqp/ProtonBasedCommandRouterCommandConsumerFactoryImpl.java rename clients/adapter-amqp/src/main/java/org/eclipse/hono/adapter/client/command/amqp/{ProtonBasedCommandConsumerFactory.java => ProtonBasedDelegatingCommandConsumerFactory.java} (69%) create mode 100644 tests/src/test/resources/commandrouter/application.yml create mode 100644 tests/src/test/resources/commandrouter/cache-config.xml create mode 100644 tests/src/test/resources/commandrouter/logback-spring.xml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d8503237364..ae1d91c215b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -14,7 +14,7 @@ # The job uses a matrix for the distinct device registry implementations. Thus, # for each registry implementation, the workflow is run on a separate VM. -name: Build Hono's 'container images and run integration tests +name: Build and run integration tests on: [push,pull_request] @@ -25,7 +25,12 @@ jobs: strategy: matrix: device-registry: [file,jdbc,mongodb] + include: + # let the jdbc test-run use the command-router component + - device-registry: jdbc + commandrouting-mode: commandrouter + name: Use ${{ matrix.device-registry }}-registry [${{ matrix.commandrouting-mode }}] steps: - uses: actions/checkout@v2 - name: Cache local Maven repository @@ -47,4 +52,4 @@ jobs: with: java-version: '11' - name: Build all components (incl. unit tests) and run integration tests - run: mvn install -B -e -DcreateJavadoc=true -DCI=$CI -Dhono.deviceregistry.type=${{ matrix.device-registry }} -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn -Pbuild-docker-image,run-tests + run: mvn install -B -e -DcreateJavadoc=true -DCI=$CI -Dhono.deviceregistry.type=${{ matrix.device-registry }} -Dhono.commandrouting.mode=${{ matrix.commandrouting-mode }} -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn -Pbuild-docker-image,run-tests diff --git a/clients/adapter-amqp/src/main/java/org/eclipse/hono/adapter/client/command/amqp/ProtonBasedCommandContext.java b/clients/adapter-amqp/src/main/java/org/eclipse/hono/adapter/client/command/amqp/ProtonBasedCommandContext.java new file mode 100644 index 00000000000..860d0406cd6 --- /dev/null +++ b/clients/adapter-amqp/src/main/java/org/eclipse/hono/adapter/client/command/amqp/ProtonBasedCommandContext.java @@ -0,0 +1,101 @@ +/******************************************************************************* + * Copyright (c) 2020 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + *******************************************************************************/ + +package org.eclipse.hono.adapter.client.command.amqp; + +import java.util.Objects; + +import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.eclipse.hono.adapter.client.command.Command; +import org.eclipse.hono.adapter.client.command.CommandContext; +import org.eclipse.hono.util.Constants; + +import io.opentracing.Span; +import io.opentracing.SpanContext; +import io.vertx.proton.ProtonHelper; + +/** + * A wrapper around a legacy {@link org.eclipse.hono.client.CommandContext}. + */ +public class ProtonBasedCommandContext implements CommandContext { + + private final org.eclipse.hono.client.CommandContext ctx; + private final ProtonBasedCommand command; + + /** + * Creates a new command context. + * + * @param context The legacy command context to wrap. + * @throws NullPointerException if context is {@code null}. + */ + public ProtonBasedCommandContext(final org.eclipse.hono.client.CommandContext context) { + this.ctx = Objects.requireNonNull(context); + this.command = new ProtonBasedCommand(context.getCommand()); + } + + @Override + public void logCommandToSpan(final Span span) { + command.logToSpan(span); + } + + @Override + public Command getCommand() { + return command; + } + + @Override + public void accept() { + ctx.accept(); + } + + @Override + public void release() { + ctx.release(); + } + + @Override + public void modify(final boolean deliveryFailed, final boolean undeliverableHere) { + ctx.modify(deliveryFailed, undeliverableHere); + } + + @Override + public void reject(final String cause) { + final ErrorCondition error = ProtonHelper.condition(Constants.AMQP_BAD_REQUEST, cause); + ctx.reject(error); + } + + @Override + public T get(final String key) { + return ctx.get(key); + } + + @Override + public T get(final String key, final T defaultValue) { + return ctx.get(key, defaultValue); + } + + @Override + public void put(final String key, final Object value) { + ctx.put(key, value); + } + + @Override + public SpanContext getTracingContext() { + return ctx.getTracingContext(); + } + + @Override + public Span getTracingSpan() { + return ctx.getTracingSpan(); + } +} diff --git a/clients/adapter-amqp/src/main/java/org/eclipse/hono/adapter/client/command/amqp/ProtonBasedCommandRouterCommandConsumerFactoryImpl.java b/clients/adapter-amqp/src/main/java/org/eclipse/hono/adapter/client/command/amqp/ProtonBasedCommandRouterCommandConsumerFactoryImpl.java new file mode 100644 index 00000000000..be41d25fe1d --- /dev/null +++ b/clients/adapter-amqp/src/main/java/org/eclipse/hono/adapter/client/command/amqp/ProtonBasedCommandRouterCommandConsumerFactoryImpl.java @@ -0,0 +1,325 @@ +/******************************************************************************* + * Copyright (c) 2020 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + *******************************************************************************/ + +package org.eclipse.hono.adapter.client.command.amqp; + +import java.net.HttpURLConnection; +import java.time.Duration; +import java.time.Instant; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.eclipse.hono.adapter.client.amqp.AbstractServiceClient; +import org.eclipse.hono.adapter.client.command.CommandConsumer; +import org.eclipse.hono.adapter.client.command.CommandConsumerFactory; +import org.eclipse.hono.adapter.client.command.CommandContext; +import org.eclipse.hono.adapter.client.command.CommandRouterClient; +import org.eclipse.hono.client.ClientErrorException; +import org.eclipse.hono.client.HonoConnection; +import org.eclipse.hono.client.SendMessageSampler; +import org.eclipse.hono.client.ServiceInvocationException; +import org.eclipse.hono.client.impl.AdapterInstanceCommandHandler; +import org.eclipse.hono.client.impl.CommandHandlerWrapper; +import org.eclipse.hono.config.ProtocolAdapterProperties; +import org.eclipse.hono.util.CommandConstants; +import org.eclipse.hono.util.Constants; + +import io.opentracing.SpanContext; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.eventbus.Message; +import io.vertx.proton.ProtonQoS; +import io.vertx.proton.ProtonReceiver; + +/** + * A vertx-proton based factory for creating consumers of command messages received via the + * AMQP 1.0 Messaging Network. + *

+ * This implementation uses the Command Router service and receives commands forwarded by the Command Router + * component on a link with an address containing the protocol adapter instance id. + * + */ +public class ProtonBasedCommandRouterCommandConsumerFactoryImpl extends AbstractServiceClient implements CommandConsumerFactory { + + private static final int RECREATE_CONSUMER_DELAY = 20; + + /** + * Identifier that has to be unique to this factory instance. + * Will be used to represent the protocol adapter instance that this factory instance is used in, + * when registering command handlers with the command router service client. + */ + private final String adapterInstanceId; + private final AdapterInstanceCommandHandler adapterInstanceCommandHandler; + private final AtomicBoolean recreatingConsumer = new AtomicBoolean(false); + private final AtomicBoolean tryAgainRecreatingConsumer = new AtomicBoolean(false); + + private final CommandRouterClient commandRouterClient; + private ProtonReceiver adapterSpecificConsumer; + + /** + * Creates a new factory for an existing connection. + * + * @param connection The connection to the AMQP network. + * @param samplerFactory The sampler factory to use. + * @param adapterConfig The protocol adapter's configuration properties. + * @param commandRouterClient The client to use for accessing the command router service. + * @throws NullPointerException if any of the parameters is {@code null}. + */ + public ProtonBasedCommandRouterCommandConsumerFactoryImpl( + final HonoConnection connection, + final SendMessageSampler.Factory samplerFactory, + final ProtocolAdapterProperties adapterConfig, + final CommandRouterClient commandRouterClient) { + super(connection, samplerFactory, adapterConfig); + this.commandRouterClient = Objects.requireNonNull(commandRouterClient); + + // the container id contains a UUID therefore it can be used as a unique adapter instance id + adapterInstanceId = connection.getContainerId(); + adapterInstanceCommandHandler = new AdapterInstanceCommandHandler(connection.getTracer(), adapterInstanceId); + } + + @Override + public Future start() { + return super.start() + .onComplete(v -> { + connection.getVertx().eventBus().consumer(Constants.EVENT_BUS_ADDRESS_TENANT_TIMED_OUT, + this::handleTenantTimeout); + connection.addReconnectListener(c -> recreateConsumer()); + // trigger creation of adapter specific consumer link (with retry if failed) + recreateConsumer(); + }); + } + + @Override + protected void onDisconnect() { + adapterSpecificConsumer = null; + } + + /** + * {@inheritDoc} + */ + @Override + public final Future createCommandConsumer( + final String tenantId, + final String deviceId, + final Handler commandHandler, + final Duration lifespan, + final SpanContext context) { + + Objects.requireNonNull(tenantId); + Objects.requireNonNull(deviceId); + Objects.requireNonNull(commandHandler); + + return doCreateCommandConsumer(tenantId, deviceId, null, commandHandler, lifespan, context); + } + + /** + * {@inheritDoc} + */ + @Override + public final Future createCommandConsumer( + final String tenantId, + final String deviceId, + final String gatewayId, + final Handler commandHandler, + final Duration lifespan, + final SpanContext context) { + + Objects.requireNonNull(tenantId); + Objects.requireNonNull(deviceId); + Objects.requireNonNull(gatewayId); + Objects.requireNonNull(commandHandler); + + return doCreateCommandConsumer(tenantId, deviceId, gatewayId, commandHandler, lifespan, context); + } + + private Future doCreateCommandConsumer( + final String tenantId, + final String deviceId, + final String gatewayId, + final Handler commandHandler, + final Duration lifespan, + final SpanContext context) { + // lifespan greater than what can be expressed in nanoseconds (i.e. 292 years) is considered unlimited, preventing ArithmeticExceptions down the road + final Duration sanitizedLifespan = lifespan == null || lifespan.isNegative() + || lifespan.getSeconds() > (Long.MAX_VALUE / 1000_000_000L) ? Duration.ofSeconds(-1) : lifespan; + log.trace("create command consumer [tenant-id: {}, device-id: {}, gateway-id: {}]", tenantId, deviceId, gatewayId); + return connection.executeOnContext(result -> { + // register the command handler + final CommandHandlerWrapper commandHandlerWrapper = new CommandHandlerWrapper( + tenantId, + deviceId, + gatewayId, + ctx -> commandHandler.handle(new ProtonBasedCommandContext(ctx))); + final CommandHandlerWrapper replacedHandler = adapterInstanceCommandHandler + .putDeviceSpecificCommandHandler(commandHandlerWrapper); + if (replacedHandler != null) { + // TODO find a way to provide a notification here so that potential resources associated with the replaced consumer can be freed (maybe add a commandHandlerOverwritten Handler param to createCommandConsumer()) + } + // associate handler with this adapter instance + final Instant lifespanStart = Instant.now(); + registerCommandConsumer(tenantId, deviceId, sanitizedLifespan, context) + .map(v -> { + return (CommandConsumer) new CommandConsumer() { + @Override + public Future close(final SpanContext spanContext) { + return removeCommandConsumer(commandHandlerWrapper, sanitizedLifespan, + lifespanStart, spanContext); + } + }; + }) + .onComplete(result); + }); + } + + private Future registerCommandConsumer( + final String tenantId, + final String deviceId, + final Duration lifespan, + final SpanContext context) { + + return commandRouterClient.registerCommandConsumer(tenantId, deviceId, adapterInstanceId, lifespan, context) + .recover(thr -> { + log.info("error registering consumer with the command router service [tenant: {}, device: {}]", tenantId, + deviceId, thr); + // handler association failed - unregister the handler + adapterInstanceCommandHandler.removeDeviceSpecificCommandHandler(tenantId, deviceId); + return Future.failedFuture(thr); + }); + } + + private Future removeCommandConsumer( + final CommandHandlerWrapper commandHandlerWrapper, + final Duration lifespan, + final Instant lifespanStart, + final SpanContext onCloseSpanContext) { + + final String tenantId = commandHandlerWrapper.getTenantId(); + final String deviceId = commandHandlerWrapper.getDeviceId(); + + log.trace("remove command consumer [tenant-id: {}, device-id: {}]", tenantId, deviceId); + if (!adapterInstanceCommandHandler.removeDeviceSpecificCommandHandler(commandHandlerWrapper)) { + // This case happens when trying to remove a command consumer which has been overwritten since its creation + // via a 2nd invocation of 'createCommandConsumer' with the same device/tenant id. Since the 2nd 'createCommandConsumer' + // invocation has registered a different 'commandHandlerWrapper' instance (and possibly already removed it), + // trying to remove the original object will return false here. + // On a more abstract level, this case happens when 2 consecutive command subscription requests from the + // same device (with no intermittent disconnect/unsubscribe - possibly because of a broken connection in between) have + // reached the *same* adapter instance and verticle, using this CommandConsumerFactory. Invoking 'removeCommandConsumer' + // on the 1st (obsolete and overwritten) command subscription shall have no impact. Throwing an explicit exception + // here will enable the protocol adapter to detect this case and skip an (incorrect) "disconnectedTtd" event message. + log.debug("command consumer not removed - handler already replaced or removed [tenant: {}, device: {}]", + tenantId, deviceId); + return Future.failedFuture(new ClientErrorException(HttpURLConnection.HTTP_PRECON_FAILED, + "local command handler already replaced or removed")); + } + return commandRouterClient.unregisterCommandConsumer( + tenantId, + deviceId, + adapterInstanceId, + onCloseSpanContext) + .recover(thr -> { + if (ServiceInvocationException.extractStatusCode(thr) == HttpURLConnection.HTTP_PRECON_FAILED) { + final boolean entryMayHaveExpired = !lifespan.isNegative() && Instant.now().isAfter(lifespanStart.plus(lifespan)); + if (entryMayHaveExpired) { + log.trace("ignoring 412 error when unregistering consumer with the command router service; entry may have already expired [tenant: {}, device: {}]", + tenantId, deviceId); + return Future.succeededFuture(); + } else { + // entry wasn't actually removed and entry hasn't expired (yet); + // This case happens when 2 consecutive command subscription requests from the same device + // (with no intermittent disconnect/unsubscribe - possibly because of a broken connection in between) + // have reached *different* protocol adapter instances/verticles. Now calling 'unregisterCommandConsumer' + // on the 1st subscription fails because of the non-matching adapterInstanceId parameter. + // Throwing an explicit exception here will enable the protocol adapter to detect this case + // and skip sending an (incorrect) "disconnectedTtd" event message. + log.debug("consumer not unregistered - not matched or already removed [tenant: {}, device: {}]", + tenantId, deviceId); + return Future.failedFuture(new ClientErrorException(HttpURLConnection.HTTP_PRECON_FAILED, + "no matching command consumer mapping found to be removed")); + } + } else { + log.info("error unregistering consumer with the command router service [tenant: {}, device: {}]", tenantId, + deviceId, thr); + return Future.failedFuture(thr); + } + }); + } + + private Future createAdapterSpecificConsumer() { + log.trace("creating new adapter instance command consumer"); + final String adapterInstanceConsumerAddress = CommandConstants.INTERNAL_COMMAND_ENDPOINT + "/" + + adapterInstanceId; + return connection.createReceiver( + adapterInstanceConsumerAddress, + ProtonQoS.AT_LEAST_ONCE, + (delivery, msg) -> adapterInstanceCommandHandler.handleCommandMessage(msg, delivery), + connection.getConfig().getInitialCredits(), + false, // no auto-accept + sourceAddress -> { // remote close hook + log.debug("command receiver link closed remotely"); + invokeRecreateConsumerWithDelay(); + }).map(receiver -> { + log.debug("successfully created adapter specific command consumer"); + adapterSpecificConsumer = receiver; + return receiver; + }).recover(t -> { + log.error("failed to create adapter specific command consumer", t); + return Future.failedFuture(t); + }); + } + + private void recreateConsumer() { + if (recreatingConsumer.compareAndSet(false, true)) { + connection.isConnected(getDefaultConnectionCheckTimeout()) + .compose(res -> { + // recreate adapter specific consumer + if (adapterSpecificConsumer == null || !adapterSpecificConsumer.isOpen()) { + log.debug("recreate adapter specific command consumer link"); + return createAdapterSpecificConsumer(); + } + return Future.succeededFuture(); + }).onComplete(ar -> { + recreatingConsumer.set(false); + if (tryAgainRecreatingConsumer.compareAndSet(true, false) || ar.failed()) { + if (ar.succeeded()) { + // tryAgainRecreatingConsumers was set - try again immediately + recreateConsumer(); + } else { + invokeRecreateConsumerWithDelay(); + } + } + }); + } else { + // if recreateConsumer() was triggered by a remote link closing, that might have occurred after that link was dealt with above; + // therefore be sure recreateConsumer() gets called again once the current invocation has finished. + log.debug("already recreating consumer"); + tryAgainRecreatingConsumer.set(true); + } + } + + private void invokeRecreateConsumerWithDelay() { + connection.getVertx().setTimer(RECREATE_CONSUMER_DELAY, tid -> recreateConsumer()); + } + + private void handleTenantTimeout(final Message msg) { + final String tenantId = msg.body(); + adapterInstanceCommandHandler.getDeviceSpecificCommandHandlers().stream() + .filter(handler -> handler.getTenantId().equals(tenantId)) + .forEach(handler -> { + log.info("timeout of tenant {}: removing command handler for device {}", tenantId, handler.getDeviceId()); + adapterInstanceCommandHandler.removeDeviceSpecificCommandHandler(handler.getTenantId(), handler.getDeviceId()); + }); + } + +} diff --git a/clients/adapter-amqp/src/main/java/org/eclipse/hono/adapter/client/command/amqp/ProtonBasedCommandConsumerFactory.java b/clients/adapter-amqp/src/main/java/org/eclipse/hono/adapter/client/command/amqp/ProtonBasedDelegatingCommandConsumerFactory.java similarity index 69% rename from clients/adapter-amqp/src/main/java/org/eclipse/hono/adapter/client/command/amqp/ProtonBasedCommandConsumerFactory.java rename to clients/adapter-amqp/src/main/java/org/eclipse/hono/adapter/client/command/amqp/ProtonBasedDelegatingCommandConsumerFactory.java index 668b04d5e81..ada467d230c 100644 --- a/clients/adapter-amqp/src/main/java/org/eclipse/hono/adapter/client/command/amqp/ProtonBasedCommandConsumerFactory.java +++ b/clients/adapter-amqp/src/main/java/org/eclipse/hono/adapter/client/command/amqp/ProtonBasedDelegatingCommandConsumerFactory.java @@ -11,16 +11,13 @@ * SPDX-License-Identifier: EPL-2.0 */ - package org.eclipse.hono.adapter.client.command.amqp; import java.time.Duration; import java.util.List; import java.util.Objects; -import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.eclipse.hono.adapter.client.amqp.AbstractServiceClient; -import org.eclipse.hono.adapter.client.command.Command; import org.eclipse.hono.adapter.client.command.CommandConsumer; import org.eclipse.hono.adapter.client.command.CommandConsumerFactory; import org.eclipse.hono.adapter.client.command.CommandContext; @@ -33,27 +30,27 @@ import org.eclipse.hono.client.ProtocolAdapterCommandConsumerFactory.CommandHandlingAdapterInfoAccess; import org.eclipse.hono.client.SendMessageSampler.Factory; import org.eclipse.hono.config.ProtocolAdapterProperties; -import org.eclipse.hono.util.Constants; import org.eclipse.hono.util.RegistrationAssertion; -import io.opentracing.Span; import io.opentracing.SpanContext; import io.opentracing.Tracer; import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.json.JsonObject; -import io.vertx.proton.ProtonHelper; /** * A vertx-proton based factory for creating consumers of command messages received via the * AMQP 1.0 Messaging Network. *

- * This implementation wraps a {@link ProtocolAdapterCommandConsumerFactory} and thus also supports - * routing of commands to a target protocol adapter instance. + * This implementation supports delegation of command message handling to another protocol adapter instance + * by routing the message on an adapter-instance specific address. + *

+ * This functionality is implemented via the wrapped default {@link ProtocolAdapterCommandConsumerFactory} + * implementation. * */ -public class ProtonBasedCommandConsumerFactory extends AbstractServiceClient implements CommandConsumerFactory { +public class ProtonBasedDelegatingCommandConsumerFactory extends AbstractServiceClient implements CommandConsumerFactory { private final ProtocolAdapterCommandConsumerFactory factory; @@ -68,7 +65,7 @@ public class ProtonBasedCommandConsumerFactory extends AbstractServiceClient imp * @param tracer The OpenTracing tracer to use for tracking the processing of messages. * @throws NullPointerException if any of the parameters are {@code null}. */ - public ProtonBasedCommandConsumerFactory( + public ProtonBasedDelegatingCommandConsumerFactory( final HonoConnection connection, final Factory samplerFactory, final ProtocolAdapterProperties adapterConfig, @@ -153,7 +150,7 @@ public Future createCommandConsumer( tenantId, deviceId, ctx -> { - commandHandler.handle(new CommandContextAdapter(ctx)); + commandHandler.handle(new ProtonBasedCommandContext(ctx)); }, lifespan, context) @@ -185,7 +182,7 @@ public Future createCommandConsumer( deviceId, gatewayId, ctx -> { - commandHandler.handle(new CommandContextAdapter(ctx)); + commandHandler.handle(new ProtonBasedCommandContext(ctx)); }, lifespan, context) @@ -200,108 +197,4 @@ public Future close(final SpanContext spanContext) { }); } - private static class CommandContextAdapter implements CommandContext { - - private final org.eclipse.hono.client.CommandContext ctx; - private final ProtonBasedCommand command; - - /** - * Creates a new adapter for a context. - * - * @throws NullPointerException if context is {@code null}. - */ - CommandContextAdapter(final org.eclipse.hono.client.CommandContext context) { - this.ctx = Objects.requireNonNull(context); - this.command = new ProtonBasedCommand(context.getCommand()); - } - - /** - * {@inheritDoc} - */ - @Override - public void logCommandToSpan(final Span span) { - command.logToSpan(span); - } - - /** - * {@inheritDoc} - */ - @Override - public Command getCommand() { - return command; - } - - /** - * {@inheritDoc} - */ - @Override - public void accept() { - ctx.accept(); - } - - /** - * {@inheritDoc} - */ - @Override - public void release() { - ctx.release(); - } - - /** - * {@inheritDoc} - */ - @Override - public void modify(final boolean deliveryFailed, final boolean undeliverableHere) { - ctx.modify(deliveryFailed, undeliverableHere); - } - - /** - * {@inheritDoc} - */ - @Override - public void reject(final String cause) { - final ErrorCondition error = ProtonHelper.condition(Constants.AMQP_BAD_REQUEST, cause); - ctx.reject(error); - } - - /** - * {@inheritDoc} - */ - @Override - public T get(final String key) { - return ctx.get(key); - } - - /** - * {@inheritDoc} - */ - @Override - public T get(final String key, final T defaultValue) { - return ctx.get(key, defaultValue); - } - - /** - * {@inheritDoc} - */ - @Override - public void put(final String key, final Object value) { - ctx.put(key, value); - } - - /** - * {@inheritDoc} - */ - @Override - public SpanContext getTracingContext() { - return ctx.getTracingContext(); - } - - /** - * {@inheritDoc} - */ - @Override - public Span getTracingSpan() { - return ctx.getTracingSpan(); - } - } } diff --git a/clients/adapter-amqp/src/main/java/org/eclipse/hono/adapter/client/command/amqp/ProtonBasedDeviceConnectionClient.java b/clients/adapter-amqp/src/main/java/org/eclipse/hono/adapter/client/command/amqp/ProtonBasedDeviceConnectionClient.java index 94ede01b575..007f2bf41fe 100644 --- a/clients/adapter-amqp/src/main/java/org/eclipse/hono/adapter/client/command/amqp/ProtonBasedDeviceConnectionClient.java +++ b/clients/adapter-amqp/src/main/java/org/eclipse/hono/adapter/client/command/amqp/ProtonBasedDeviceConnectionClient.java @@ -52,7 +52,7 @@ public class ProtonBasedDeviceConnectionClient extends AbstractRequestResponseCl * @param connection The connection to the Device Connection service. * @param samplerFactory The factory for creating samplers for tracing AMQP messages being sent. * @param adapterConfig The protocol adapter's configuration properties. - * @throws NullPointerException if any of the parameters other than the cache provider are {@code null}. + * @throws NullPointerException if any of the parameters is {@code null}. */ public ProtonBasedDeviceConnectionClient( final HonoConnection connection, diff --git a/deploy/src/main/deploy/example-permissions.json b/deploy/src/main/deploy/example-permissions.json index cf223250f47..7b3e36ef483 100644 --- a/deploy/src/main/deploy/example-permissions.json +++ b/deploy/src/main/deploy/example-permissions.json @@ -37,6 +37,14 @@ "operation": "tenant/*:*", "activities": [ "EXECUTE" ] }, + { + "resource": "cmd_router/*", + "activities": [ "READ", "WRITE" ] + }, + { + "operation": "cmd_router/*:*", + "activities": [ "EXECUTE" ] + }, { "resource": "device_con/*", "activities": [ "READ", "WRITE" ] diff --git a/deploy/src/main/sandbox/sandbox-permissions.json b/deploy/src/main/sandbox/sandbox-permissions.json index 831e45ebb47..bccf4f9ccf8 100644 --- a/deploy/src/main/sandbox/sandbox-permissions.json +++ b/deploy/src/main/sandbox/sandbox-permissions.json @@ -37,6 +37,14 @@ "operation": "tenant/*:*", "activities": [ "EXECUTE" ] }, + { + "resource": "cmd_router/*", + "activities": [ "READ", "WRITE" ] + }, + { + "operation": "cmd_router/*:*", + "activities": [ "EXECUTE" ] + }, { "resource": "device_con/*", "activities": [ "READ", "WRITE" ] diff --git a/jenkins/Hono-Deploy-Eclipse-Pipeline.groovy b/jenkins/Hono-Deploy-Eclipse-Pipeline.groovy index 6eb2f4254d8..479d9eb46c1 100644 --- a/jenkins/Hono-Deploy-Eclipse-Pipeline.groovy +++ b/jenkins/Hono-Deploy-Eclipse-Pipeline.groovy @@ -57,7 +57,7 @@ def buildAndDeploy(def utils) { jdk: utils.getJDKVersion(), mavenLocalRepo: '.repository', options: [artifactsPublisher(disabled: true)]) { - sh "mvn --projects :hono-service-auth,:hono-service-device-registry-file,:hono-service-device-registry-mongodb,:hono-service-device-connection,:hono-adapter-http-vertx,:hono-adapter-mqtt-vertx,:hono-adapter-kura,:hono-adapter-amqp-vertx,:hono-adapter-lora-vertx,:hono-adapter-sigfox-vertx,:hono-adapter-coap-vertx,:hono-example,:hono-cli -am deploy -DskipTests=true -DcreateJavadoc=true -DenableEclipseJarSigner=true -DskipStaging=true" + sh "mvn --projects :hono-service-auth,:hono-service-device-registry-file,:hono-service-device-registry-mongodb,:hono-service-command-router,:hono-service-device-connection,:hono-adapter-http-vertx,:hono-adapter-mqtt-vertx,:hono-adapter-kura,:hono-adapter-amqp-vertx,:hono-adapter-lora-vertx,:hono-adapter-sigfox-vertx,:hono-adapter-coap-vertx,:hono-example,:hono-cli -am deploy -DskipTests=true -DcreateJavadoc=true -DenableEclipseJarSigner=true -DskipStaging=true" } } } diff --git a/jenkins/Hono-Deploy-Maven-Central-Pipeline.groovy b/jenkins/Hono-Deploy-Maven-Central-Pipeline.groovy index 4d01be18f56..76373693fd8 100644 --- a/jenkins/Hono-Deploy-Maven-Central-Pipeline.groovy +++ b/jenkins/Hono-Deploy-Maven-Central-Pipeline.groovy @@ -62,7 +62,7 @@ def buildAndDeploy(def utils) { mavenLocalRepo: '.repository', mavenSettingsFilePath: "${params.MAVEN_SETTINGS_FILE}", options: [artifactsPublisher(disabled: true)]) { - sh "mvn deploy -pl :hono-service-auth,:hono-service-device-registry-file,:hono-service-device-registry-mongodb,:hono-service-device-connection,:hono-adapter-http-vertx,:hono-adapter-mqtt-vertx,:hono-adapter-kura,:hono-adapter-amqp-vertx,:hono-adapter-lora-vertx,:hono-adapter-sigfox-vertx,:hono-adapter-coap-vertx,:hono-example,:hono-cli -am -DskipTests=true -DcreateGPGSignature=true -DcreateJavadoc=true -DenableEclipseJarSigner=true" + sh "mvn deploy -pl :hono-service-auth,:hono-service-device-registry-file,:hono-service-device-registry-mongodb,:hono-service-command-router,:hono-service-device-connection,:hono-adapter-http-vertx,:hono-adapter-mqtt-vertx,:hono-adapter-kura,:hono-adapter-amqp-vertx,:hono-adapter-lora-vertx,:hono-adapter-sigfox-vertx,:hono-adapter-coap-vertx,:hono-example,:hono-cli -am -DskipTests=true -DcreateGPGSignature=true -DcreateJavadoc=true -DenableEclipseJarSigner=true" } } } diff --git a/jenkins/Hono-Nightly-Pipeline.groovy b/jenkins/Hono-Nightly-Pipeline.groovy index 903567f90f8..c9c666becdc 100644 --- a/jenkins/Hono-Nightly-Pipeline.groovy +++ b/jenkins/Hono-Nightly-Pipeline.groovy @@ -49,7 +49,7 @@ def nightlyBuild(def utils) { stage('Build') { withMaven(maven: utils.getMavenVersion(), jdk: utils.getJDKVersion(), options: [artifactsPublisher(disabled: true)]) { sh 'mvn clean package javadoc:aggregate' - sh 'mvn --projects :hono-service-auth,:hono-service-device-registry-file,:hono-service-device-registry-mongodb,:hono-service-device-connection,:hono-adapter-http-vertx,:hono-adapter-mqtt-vertx,:hono-adapter-kura,:hono-adapter-amqp-vertx,:hono-adapter-lora-vertx,:hono-adapter-sigfox-vertx,:hono-adapter-coap-vertx,:hono-example,:hono-cli -am deploy -DcreateJavadoc=true -DenableEclipseJarSigner=true' + sh 'mvn --projects :hono-service-auth,:hono-service-device-registry-file,:hono-service-device-registry-mongodb,:hono-service-command-router,:hono-service-device-connection,:hono-adapter-http-vertx,:hono-adapter-mqtt-vertx,:hono-adapter-kura,:hono-adapter-amqp-vertx,:hono-adapter-lora-vertx,:hono-adapter-sigfox-vertx,:hono-adapter-coap-vertx,:hono-example,:hono-cli -am deploy -DcreateJavadoc=true -DenableEclipseJarSigner=true' } } } diff --git a/push_hono_images.sh b/push_hono_images.sh index 350f58e8aca..4372e804266 100755 --- a/push_hono_images.sh +++ b/push_hono_images.sh @@ -1,6 +1,6 @@ #!/bin/bash #******************************************************************************* -# Copyright (c) 2016, 2019 Contributors to the Eclipse Foundation +# Copyright (c) 2016, 2020 Contributors to the Eclipse Foundation # # See the NOTICE file(s) distributed with this work for additional # information regarding copyright ownership. @@ -22,6 +22,7 @@ IMAGES="hono-adapter-amqp-vertx \ hono-adapter-mqtt-vertx \ hono-adapter-sigfox-vertx \ hono-service-auth \ + hono-service-command-router \ hono-service-device-connection \ hono-service-device-registry-file \ hono-service-device-registry-mongodb" diff --git a/service-base-quarkus/src/main/java/org/eclipse/hono/service/quarkus/AbstractProtocolAdapterApplication.java b/service-base-quarkus/src/main/java/org/eclipse/hono/service/quarkus/AbstractProtocolAdapterApplication.java index 946f5c58c4f..ea6d953748c 100644 --- a/service-base-quarkus/src/main/java/org/eclipse/hono/service/quarkus/AbstractProtocolAdapterApplication.java +++ b/service-base-quarkus/src/main/java/org/eclipse/hono/service/quarkus/AbstractProtocolAdapterApplication.java @@ -25,9 +25,10 @@ import org.eclipse.hono.adapter.client.command.CommandRouterClient; import org.eclipse.hono.adapter.client.command.DeviceConnectionClient; import org.eclipse.hono.adapter.client.command.DeviceConnectionClientAdapter; -import org.eclipse.hono.adapter.client.command.amqp.ProtonBasedCommandConsumerFactory; import org.eclipse.hono.adapter.client.command.amqp.ProtonBasedCommandResponseSender; import org.eclipse.hono.adapter.client.command.amqp.ProtonBasedCommandRouterClient; +import org.eclipse.hono.adapter.client.command.amqp.ProtonBasedCommandRouterCommandConsumerFactoryImpl; +import org.eclipse.hono.adapter.client.command.amqp.ProtonBasedDelegatingCommandConsumerFactory; import org.eclipse.hono.adapter.client.command.amqp.ProtonBasedDeviceConnectionClient; import org.eclipse.hono.adapter.client.registry.CredentialsClient; import org.eclipse.hono.adapter.client.registry.DeviceRegistrationClient; @@ -257,8 +258,8 @@ protected CommandConsumerFactory commandConsumerFactory( final DeviceRegistrationClient deviceRegistrationClient) { LOG.debug("using Device Connection service client, configuring CommandConsumerFactory [{}]", - ProtonBasedCommandConsumerFactory.class.getName()); - return new ProtonBasedCommandConsumerFactory( + ProtonBasedDelegatingCommandConsumerFactory.class.getName()); + return new ProtonBasedDelegatingCommandConsumerFactory( commandConsumerConnection(), messageSamplerFactory, protocolAdapterProperties, @@ -274,12 +275,15 @@ protected CommandConsumerFactory commandConsumerFactory( * * @param commandRouterClient The client for accessing the Command Router service. * @return The factory. - * @throws UnsupportedOperationException if the factory type is not supported yet */ - protected CommandConsumerFactory commandConsumerFactory( - final CommandRouterClient commandRouterClient) { - LOG.debug("using Command Router service client, configuring CommandConsumerFactory [unknown]"); - throw new UnsupportedOperationException("not supported yet"); + protected CommandConsumerFactory commandConsumerFactory(final CommandRouterClient commandRouterClient) { + LOG.debug("using Command Router service client, configuring CommandConsumerFactory [{}}]", + ProtonBasedCommandRouterCommandConsumerFactoryImpl.class.getName()); + return new ProtonBasedCommandRouterCommandConsumerFactoryImpl( + commandConsumerConnection(), + messageSamplerFactory, + protocolAdapterProperties, + commandRouterClient); } /** diff --git a/service-base/src/main/java/org/eclipse/hono/service/AbstractAdapterConfig.java b/service-base/src/main/java/org/eclipse/hono/service/AbstractAdapterConfig.java index ba92c910425..6e01fccbbc3 100644 --- a/service-base/src/main/java/org/eclipse/hono/service/AbstractAdapterConfig.java +++ b/service-base/src/main/java/org/eclipse/hono/service/AbstractAdapterConfig.java @@ -22,8 +22,10 @@ import org.eclipse.hono.adapter.client.command.CommandRouterClient; import org.eclipse.hono.adapter.client.command.DeviceConnectionClient; import org.eclipse.hono.adapter.client.command.DeviceConnectionClientAdapter; -import org.eclipse.hono.adapter.client.command.amqp.ProtonBasedCommandConsumerFactory; import org.eclipse.hono.adapter.client.command.amqp.ProtonBasedCommandResponseSender; +import org.eclipse.hono.adapter.client.command.amqp.ProtonBasedCommandRouterClient; +import org.eclipse.hono.adapter.client.command.amqp.ProtonBasedCommandRouterCommandConsumerFactoryImpl; +import org.eclipse.hono.adapter.client.command.amqp.ProtonBasedDelegatingCommandConsumerFactory; import org.eclipse.hono.adapter.client.command.amqp.ProtonBasedDeviceConnectionClient; import org.eclipse.hono.adapter.client.registry.CredentialsClient; import org.eclipse.hono.adapter.client.registry.DeviceRegistrationClient; @@ -53,6 +55,7 @@ import org.eclipse.hono.service.resourcelimits.PrometheusBasedResourceLimitChecksConfig; import org.eclipse.hono.service.resourcelimits.ResourceLimitChecks; import org.eclipse.hono.util.CommandConstants; +import org.eclipse.hono.util.CommandRouterConstants; import org.eclipse.hono.util.Constants; import org.eclipse.hono.util.CredentialsConstants; import org.eclipse.hono.util.DeviceConnectionConstants; @@ -581,7 +584,7 @@ public HonoConnection deviceConnectionServiceConnection() { @Scope("prototype") @ConditionalOnProperty(prefix = "hono.device-connection", name = "host") public DeviceConnectionClient deviceConnectionClient( - final SendMessageSampler.Factory samplerFactory, + final SendMessageSampler.Factory samplerFactory, final ProtocolAdapterProperties adapterConfig) { return new ProtonBasedDeviceConnectionClient( @@ -590,6 +593,70 @@ public DeviceConnectionClient deviceConnectionClient( adapterConfig); } + /** + * Exposes configuration properties for accessing the command router service as a Spring bean. + * + * @return The properties. + */ + @Bean + @Qualifier(CommandRouterConstants.COMMAND_ROUTER_ENDPOINT) + @ConfigurationProperties(prefix = "hono.command-router") + @ConditionalOnProperty(prefix = "hono.command-router", name = "host") + public RequestResponseClientConfigProperties commandRouterServiceClientConfig() { + final RequestResponseClientConfigProperties config = Optional.ofNullable(getCommandRouterClientConfigDefaults()) + .orElseGet(RequestResponseClientConfigProperties::new); + setConfigServerRoleIfUnknown(config, "Command Router"); + setDefaultConfigNameIfNotSet(config); + return config; + } + + /** + * Gets the default client properties, on top of which the configured properties will be loaded, to be then provided + * via {@link #commandRouterServiceClientConfig()}. + *

+ * This method returns an empty set of properties by default. Subclasses may override this method to set specific + * properties. + * + * @return The properties. + */ + protected RequestResponseClientConfigProperties getCommandRouterClientConfigDefaults() { + return new RequestResponseClientConfigProperties(); + } + + /** + * Exposes the connection used for accessing the command router service as a Spring bean. + * + * @return The connection. + */ + @Bean + @Qualifier(CommandRouterConstants.COMMAND_ROUTER_ENDPOINT) + @Scope("prototype") + @ConditionalOnProperty(prefix = "hono.command-router", name = "host") + public HonoConnection commandRouterServiceConnection() { + return HonoConnection.newConnection(vertx(), commandRouterServiceClientConfig()); + } + + /** + * Exposes a client for accessing the Command Router API as a Spring bean. + * + * @param samplerFactory The sampler factory to use. + * @param adapterConfig The protocol adapter's configuration properties. + * @return The client. + */ + @Bean + @Qualifier(CommandRouterConstants.COMMAND_ROUTER_ENDPOINT) + @Scope("prototype") + @ConditionalOnProperty(prefix = "hono.command-router", name = "host") + public CommandRouterClient commandRouterClient( + final SendMessageSampler.Factory samplerFactory, + final ProtocolAdapterProperties adapterConfig) { + + return new ProtonBasedCommandRouterClient( + commandRouterServiceConnection(), + samplerFactory, + adapterConfig); + } + /** * Exposes configuration properties for Command and Control. * @@ -637,8 +704,8 @@ CommandConsumerFactory commandConsumerFactory( final DeviceRegistrationClient registrationClient) { LOG.debug("using Device Connection service client, configuring CommandConsumerFactory [{}]", - ProtonBasedCommandConsumerFactory.class.getName()); - return new ProtonBasedCommandConsumerFactory( + ProtonBasedDelegatingCommandConsumerFactory.class.getName()); + return new ProtonBasedDelegatingCommandConsumerFactory( commandConsumerConnection(), samplerFactory, adapterProperties, @@ -652,8 +719,13 @@ CommandConsumerFactory commandConsumerFactory( final SendMessageSampler.Factory samplerFactory, final CommandRouterClient commandRouterClient) { - LOG.debug("using Command Router service client, configuring CommandConsumerFactory [unknown]"); - throw new UnsupportedOperationException("not implemented yet"); + LOG.debug("using Command Router service client, configuring CommandConsumerFactory [{}}]", + ProtonBasedCommandRouterCommandConsumerFactoryImpl.class.getName()); + return new ProtonBasedCommandRouterCommandConsumerFactoryImpl( + commandConsumerConnection(), + samplerFactory, + adapterProperties, + commandRouterClient); } /** @@ -666,7 +738,7 @@ CommandConsumerFactory commandConsumerFactory( @Bean @Scope("prototype") public CommandResponseSender commandResponseSender( - final SendMessageSampler.Factory samplerFactory, + final SendMessageSampler.Factory samplerFactory, final ProtocolAdapterProperties adapterConfig) { return new ProtonBasedCommandResponseSender( diff --git a/service-base/src/main/java/org/eclipse/hono/service/AbstractProtocolAdapterBase.java b/service-base/src/main/java/org/eclipse/hono/service/AbstractProtocolAdapterBase.java index e8e8fafc116..6141fa932aa 100644 --- a/service-base/src/main/java/org/eclipse/hono/service/AbstractProtocolAdapterBase.java +++ b/service-base/src/main/java/org/eclipse/hono/service/AbstractProtocolAdapterBase.java @@ -1028,7 +1028,7 @@ public void registerReadinessChecks(final HealthCheckHandler handler) { ((ServiceClient) telemetrySender).registerReadinessChecks(handler); } if (eventSender instanceof ServiceClient) { - ((ServiceClient ) eventSender).registerReadinessChecks(handler); + ((ServiceClient) eventSender).registerReadinessChecks(handler); } } @@ -1064,7 +1064,7 @@ public void registerLivenessChecks(final HealthCheckHandler handler) { ((ServiceClient) telemetrySender).registerLivenessChecks(handler); } if (eventSender instanceof ServiceClient) { - ((ServiceClient ) eventSender).registerLivenessChecks(handler); + ((ServiceClient) eventSender).registerLivenessChecks(handler); } } diff --git a/tests/pom.xml b/tests/pom.xml index fe42f77eb7f..10ae5da4fe8 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -39,6 +39,7 @@ Test cases are run against Docker images of Hono server + (Apache Qpid Dispatch --> hono-dispatch-router.hono hono-service-device-connection.hono + hono-service-command-router.hono hono-service-device-registry.hono hono-service-auth.hono @@ -70,6 +71,16 @@ Test cases are run against Docker images of Hono server + (Apache Qpid Dispatch 4 + + deviceConnection + ${hono.device-connection.host} + true + false + @@ -251,6 +262,21 @@ Test cases are run against Docker images of Hono server + (Apache Qpid Dispatch 240000 + + command-router + + + hono.commandrouting.mode + commandrouter + + + + commandRouter + ${hono.commandrouter.host} + false + true + + device-registry-jdbc @@ -477,6 +503,7 @@ Test cases are run against Docker images of Hono server + (Apache Qpid Dispatch ${docker.repository}/hono-jaeger-all-in-one-test jaeger + ${jaeger.disabled} IfNotPresent ${jaeger.image.name} @@ -808,6 +835,7 @@ Test cases are run against Docker images of Hono server + (Apache Qpid Dispatch ${docker.repository}/hono-service-device-connection-test device-connection + ${hono.device-connection.disabled} IfNotPresent ${docker.repository}/hono-service-device-connection:${project.version} @@ -837,6 +865,7 @@ Test cases are run against Docker images of Hono server + (Apache Qpid Dispatch + ${hono.device-connection.disabled} +deviceconnection.ip:deviceconnection.amqp.port:5672 +deviceconnection.ip:deviceconnection.health.port:${vertx.health.port} @@ -875,6 +904,80 @@ Test cases are run against Docker images of Hono server + (Apache Qpid Dispatch + + + ${docker.repository}/hono-service-command-router-test + command-router + + ${hono.commandrouter.disabled} + IfNotPresent + ${docker.repository}/hono-service-command-router:${project.version} + + dir + / + + config + + + ${project.build.directory}/resources/commandrouter + etc/hono + + * + + + + ${project.build.directory}/certs + etc/hono/certs + + command-router-*.pem + auth-server-cert.pem + trusted-certs.pem + + + + + + + + ${hono.commandrouter.disabled} + + +commandrouter.ip:commandrouter.amqp.port:5672 + +commandrouter.ip:commandrouter.health.port:${vertx.health.port} + + ${project.build.directory}/docker/commandrouter.port.properties + + custom + hono + hono-service-command-router.hono + + 314572800 + 314572800 + + file:///etc/hono/logback-spring.xml + file:///etc/hono/ + ${logging.profile},embedded-cache,enable-device-connection-endpoint + ${default.java.options} + 0 + command-router + ${jaeger.host} + 6831 + const + 1 + + + CMDROUTER + ${log.color.hono-services} + + + + + GET + http://${commandrouter.ip}:${commandrouter.health.port}/readiness + 200..299 + + + + ${docker.repository}/hono-adapter-http-test @@ -1247,8 +1350,11 @@ Test cases are run against Docker images of Hono server + (Apache Qpid Dispatch ${qpid.amqp.port} consumer@HONO verysecret + ${hono.commandrouter.disabled} ${deviceconnection.ip} ${deviceconnection.amqp.port} + ${commandrouter.ip} + ${commandrouter.amqp.port} ${deviceregistry.ip} ${deviceregistry.amqp.port} ${deviceregistry.http.port} @@ -1307,6 +1413,7 @@ Test cases are run against Docker images of Hono server + (Apache Qpid Dispatch ${project.build.directory}/docker/auth.port.properties ${project.build.directory}/docker/deviceregistry.port.properties ${project.build.directory}/docker/deviceconnection.port.properties + ${project.build.directory}/docker/commandrouter.port.properties ${project.build.directory}/docker/broker.port.properties ${project.build.directory}/docker/qpid.port.properties ${project.build.directory}/docker/adapter.http.port.properties diff --git a/tests/readme.md b/tests/readme.md index bf3df642fb1..1a518b7b275 100644 --- a/tests/readme.md +++ b/tests/readme.md @@ -97,8 +97,14 @@ In order to stop and remove the Docker containers started by a test run, use: ### Running the Tests with the Quarkus based Protocol Adapters By default, the integration tests are run using the Spring Boot based protocol adapters. For some protocol adapters there are -Quarkus based alternative implememtations. The tests can be run using these Quarkus based adapters by means of activating +Quarkus based alternative implementations. The tests can be run using these Quarkus based adapters by means of activating the `protocol-adapters-quarkus` maven profile: $ mvn verify -Prun-tests,protocol-adapters-quarkus +### Running the Tests with the Command Router component + +By default, the integration tests are run using the Device Connection service component. In order to use the Command +Router service component instead, the `command-router` maven profile can be set: + + $ mvn verify -Prun-tests,command-router diff --git a/tests/src/test/java/org/eclipse/hono/tests/IntegrationTestSupport.java b/tests/src/test/java/org/eclipse/hono/tests/IntegrationTestSupport.java index bd6ad969887..9fbe6ee567f 100644 --- a/tests/src/test/java/org/eclipse/hono/tests/IntegrationTestSupport.java +++ b/tests/src/test/java/org/eclipse/hono/tests/IntegrationTestSupport.java @@ -169,6 +169,10 @@ public final class IntegrationTestSupport { * has access to all tenants. */ public static final String PROPERTY_TENANT_ADMIN_PASSWORD = "tenant.admin.password"; + /** + * The name of the system property to use for indicating whether the Device Connection service is enabled. + */ + public static final String PROPERTY_DEVICECONNECTION_SERVICE_ENABLED = "deviceconnection.enabled"; /** * The name of the system property to use for setting the IP address of the Device Connection service. */ @@ -323,6 +327,11 @@ public final class IntegrationTestSupport { */ public static final int HONO_DEVICEREGISTRY_HTTP_PORT = Integer.getInteger(PROPERTY_DEVICEREGISTRY_HTTP_PORT, DEFAULT_DEVICEREGISTRY_HTTP_PORT); + /** + * The boolean value indicating whether the Device Connection service is enabled. + */ + public static final boolean HONO_DEVICECONNECTION_SERVICE_ENABLED = Boolean.parseBoolean(System.getProperty( + PROPERTY_DEVICECONNECTION_SERVICE_ENABLED, "true")); /** * The IP address of the Device Connection service. */ @@ -541,6 +550,17 @@ public static ClientConfigProperties getDeviceRegistryProperties(final String us password); } + /** + * Checks if the Device Connection service is enabled. + *

+ * Evaluates the system property deviceconnection.enabled. Returns {@code true} if it doesn't exist. + * + * @return {@code true} if the Device Connection service is enabled. + */ + public static boolean isDeviceConnectionServiceEnabled() { + return HONO_DEVICECONNECTION_SERVICE_ENABLED; + } + /** * Creates properties for connecting to the Device Connection service. * diff --git a/tests/src/test/java/org/eclipse/hono/tests/registry/DeviceConnectionApiTests.java b/tests/src/test/java/org/eclipse/hono/tests/registry/DeviceConnectionApiTests.java index 872aefed5bd..b01819cdcf6 100644 --- a/tests/src/test/java/org/eclipse/hono/tests/registry/DeviceConnectionApiTests.java +++ b/tests/src/test/java/org/eclipse/hono/tests/registry/DeviceConnectionApiTests.java @@ -15,6 +15,7 @@ package org.eclipse.hono.tests.registry; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assumptions.assumeTrue; import java.net.HttpURLConnection; import java.time.Duration; @@ -23,7 +24,9 @@ import java.util.concurrent.TimeUnit; import org.eclipse.hono.client.DeviceConnectionClient; +import org.eclipse.hono.tests.IntegrationTestSupport; import org.eclipse.hono.util.DeviceConnectionConstants; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import io.vertx.core.Future; @@ -40,6 +43,11 @@ */ abstract class DeviceConnectionApiTests extends DeviceRegistryTestBase { + @BeforeAll + public static void setup() { + assumeTrue(IntegrationTestSupport.isDeviceConnectionServiceEnabled()); + } + /** * Gets a client for interacting with the Device Connection service. * diff --git a/tests/src/test/resources/amqp/application.yml b/tests/src/test/resources/amqp/application.yml index a3f1f9a7a30..92459412196 100644 --- a/tests/src/test/resources/amqp/application.yml +++ b/tests/src/test/resources/amqp/application.yml @@ -53,10 +53,10 @@ hono: linkEstablishmentTimeout: ${link.establishment.timeout} flowLatency: ${flow.latency} requestTimeout: ${request.timeout} - deviceConnection: + ${hono.command-related-service.configname}: name: 'Hono AMQP Adapter' - host: ${hono.device-connection.host} - port: 5672 # AMQP port of the device registry + host: ${hono.command-related-service.host} + port: 5672 username: amqp-adapter@HONO password: amqp-secret linkEstablishmentTimeout: ${link.establishment.timeout} diff --git a/tests/src/test/resources/auth/permissions.json b/tests/src/test/resources/auth/permissions.json index 701aba6cec2..3247e347077 100644 --- a/tests/src/test/resources/auth/permissions.json +++ b/tests/src/test/resources/auth/permissions.json @@ -33,6 +33,14 @@ "operation": "tenant/*:*", "activities": [ "EXECUTE" ] }, + { + "resource": "cmd_router/*", + "activities": [ "READ", "WRITE" ] + }, + { + "operation": "cmd_router/*:*", + "activities": [ "EXECUTE" ] + }, { "resource": "device_con/*", "activities": [ "READ", "WRITE" ] @@ -42,6 +50,16 @@ "activities": [ "EXECUTE" ] } ], + "command-router": [ + { + "resource": "registration/*", + "activities": [ "READ", "WRITE" ] + }, + { + "operation": "registration/*:*", + "activities": [ "EXECUTE" ] + } + ], "DEFAULT_TENANT-manager": [ { "resource": "registration/DEFAULT_TENANT", @@ -173,6 +191,14 @@ "protocol-adapter" ] }, + "command-router@HONO": { + "mechanism": "PLAIN", + "password": "cmd-router-secret", + "authorities": [ + "hono-component", + "command-router" + ] + }, "hono-client": { "mechanism": "PLAIN", "password": "secret", diff --git a/tests/src/test/resources/coap/application.yml b/tests/src/test/resources/coap/application.yml index c3960a21206..5bb771e528f 100644 --- a/tests/src/test/resources/coap/application.yml +++ b/tests/src/test/resources/coap/application.yml @@ -52,9 +52,9 @@ hono: linkEstablishmentTimeout: ${link.establishment.timeout} flowLatency: ${flow.latency} requestTimeout: ${request.timeout} - deviceConnection: + ${hono.command-related-service.configname}: name: 'Hono CoAP Adapter' - host: ${hono.device-connection.host} + host: ${hono.command-related-service.host} port: 5672 username: coap-adapter@HONO password: coap-secret diff --git a/tests/src/test/resources/commandrouter/application.yml b/tests/src/test/resources/commandrouter/application.yml new file mode 100644 index 00000000000..220e8279886 --- /dev/null +++ b/tests/src/test/resources/commandrouter/application.yml @@ -0,0 +1,43 @@ +hono: + app: + maxInstances: 1 + startupTimeout: 90 + healthCheck: + insecurePortBindAddress: 0.0.0.0 + insecurePort: ${vertx.health.port} + auth: + host: ${hono.auth.host} + port: 5672 + name: command-router + validation: + certPath: /etc/hono/certs/auth-server-cert.pem + commandRouter: + amqp: + insecurePortEnabled: true + insecurePortBindAddress: 0.0.0.0 + registration: + name: 'Hono Command Router' + host: ${hono.registration.host} + port: 5672 + username: command-router@HONO + password: cmd-router-secret + linkEstablishmentTimeout: ${link.establishment.timeout} + flowLatency: ${flow.latency} + requestTimeout: ${request.timeout} + command: + name: 'Hono Command Router' + host: ${hono.amqp-network.host} + port: 5673 + amqpHostname: hono-internal + keyPath: /etc/hono/certs/command-router-key.pem + certPath: /etc/hono/certs/command-router-cert.pem + trustStorePath: /etc/hono/certs/trusted-certs.pem + linkEstablishmentTimeout: ${link.establishment.timeout} + flowLatency: ${flow.latency} + requestTimeout: ${request.timeout} + vertx: + maxEventLoopExecuteTime: ${max.event-loop.execute-time} + +spring: + jmx: + enabled: false diff --git a/tests/src/test/resources/commandrouter/cache-config.xml b/tests/src/test/resources/commandrouter/cache-config.xml new file mode 100644 index 00000000000..f491df8edbf --- /dev/null +++ b/tests/src/test/resources/commandrouter/cache-config.xml @@ -0,0 +1,21 @@ + + + + + + + + + + diff --git a/tests/src/test/resources/commandrouter/logback-spring.xml b/tests/src/test/resources/commandrouter/logback-spring.xml new file mode 100644 index 00000000000..25c646c64b3 --- /dev/null +++ b/tests/src/test/resources/commandrouter/logback-spring.xml @@ -0,0 +1,61 @@ + + + + + + + + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/tests/src/test/resources/http/application.yml b/tests/src/test/resources/http/application.yml index 498ac4fbd20..bda02ce8828 100644 --- a/tests/src/test/resources/http/application.yml +++ b/tests/src/test/resources/http/application.yml @@ -53,9 +53,9 @@ hono: linkEstablishmentTimeout: ${link.establishment.timeout} flowLatency: ${flow.latency} requestTimeout: ${request.timeout} - deviceConnection: + ${hono.command-related-service.configname}: name: 'Hono HTTP Adapter' - host: ${hono.device-connection.host} + host: ${hono.command-related-service.host} port: 5672 username: http-adapter@HONO password: http-secret diff --git a/tests/src/test/resources/mqtt/application.yml b/tests/src/test/resources/mqtt/application.yml index 8c81992a95c..689f240fbcd 100644 --- a/tests/src/test/resources/mqtt/application.yml +++ b/tests/src/test/resources/mqtt/application.yml @@ -53,9 +53,9 @@ hono: linkEstablishmentTimeout: ${link.establishment.timeout} flowLatency: ${flow.latency} requestTimeout: ${request.timeout} - deviceConnection: + ${hono.command-related-service.configname}: name: 'Hono MQTT Adapter' - host: ${hono.device-connection.host} + host: ${hono.command-related-service.host} port: 5672 username: mqtt-adapter@HONO password: mqtt-secret diff --git a/tests/src/test/resources/qpid/qdrouterd-with-broker.json b/tests/src/test/resources/qpid/qdrouterd-with-broker.json index 2fcfa0db9ce..90c8034b266 100644 --- a/tests/src/test/resources/qpid/qdrouterd-with-broker.json +++ b/tests/src/test/resources/qpid/qdrouterd-with-broker.json @@ -110,7 +110,7 @@ "maxConnections": 40, "groups": { "Hono": { - "users": "Eclipse IoT;Hono;http-adapter,Eclipse IoT;Hono;mqtt-adapter,Eclipse IoT;Hono;amqp-adapter,Eclipse IoT;Hono;coap-adapter", + "users": "Eclipse IoT;Hono;http-adapter,Eclipse IoT;Hono;mqtt-adapter,Eclipse IoT;Hono;amqp-adapter,Eclipse IoT;Hono;coap-adapter,Eclipse IoT;Hono;command-router", "remoteHosts": "*", "maxSessions": 2, "maxMessageSize": 131072,