From da368351dd435c0e30a5369755c1f7e7f3241666 Mon Sep 17 00:00:00 2001 From: Kai Hudalla Date: Mon, 15 Apr 2019 18:26:11 +0200 Subject: [PATCH] [#780] Implement CommandConsumerFactoryImpl. The methods for creating command consumer links and CommandResponseSender instances no longer depend on HonoConnectionImpl. Signed-off-by: Kai Hudalla --- .../hono/client/CommandConsumerFactory.java | 12 + .../eclipse/hono/client/HonoConnection.java | 72 ++--- .../impl/CommandConsumerFactoryImpl.java | 227 +++++++--------- .../hono/client/impl/HonoConnectionImpl.java | 70 +---- .../impl/CommandConsumerFactoryImplTest.java | 255 ++++++++---------- .../client/impl/HonoConnectionImplTest.java | 34 --- .../hono/service/AbstractAdapterConfig.java | 3 +- site/content/release-notes.md | 31 ++- 8 files changed, 306 insertions(+), 398 deletions(-) diff --git a/client/src/main/java/org/eclipse/hono/client/CommandConsumerFactory.java b/client/src/main/java/org/eclipse/hono/client/CommandConsumerFactory.java index dc71c6a179..4c9b4a3b5c 100644 --- a/client/src/main/java/org/eclipse/hono/client/CommandConsumerFactory.java +++ b/client/src/main/java/org/eclipse/hono/client/CommandConsumerFactory.java @@ -14,6 +14,7 @@ package org.eclipse.hono.client; import org.eclipse.hono.client.impl.CommandConsumer; +import org.eclipse.hono.client.impl.CommandConsumerFactoryImpl; import io.vertx.core.Future; import io.vertx.core.Handler; @@ -24,6 +25,17 @@ */ public interface CommandConsumerFactory extends ConnectionLifecycle { + /** + * Creates a new factory for an existing connection. + * + * @param connection The connection to use. + * @return The factory. + * @throws NullPointerException if connection is {@code null} + */ + static CommandConsumerFactory create(final HonoConnection connection) { + return new CommandConsumerFactoryImpl(connection); + } + /** * Creates a command consumer for a device. *

diff --git a/client/src/main/java/org/eclipse/hono/client/HonoConnection.java b/client/src/main/java/org/eclipse/hono/client/HonoConnection.java index 0d5ddab64e..6de20009a3 100644 --- a/client/src/main/java/org/eclipse/hono/client/HonoConnection.java +++ b/client/src/main/java/org/eclipse/hono/client/HonoConnection.java @@ -31,43 +31,38 @@ import io.vertx.proton.ProtonSender; /** - * A factory for creating clients for Hono's arbitrary APIs. + * A wrapper around a single AMQP 1.0 connection and a single session to a Hono service endpoint. *

- * A factory maintains a single AMQP 1.0 connection and a single session to the peer. This session is shared by all AMQP - * 1.0 links established for senders, consumers and clients created using the corresponding - * factory methods. + * The session is shared by all AMQP 1.0 links established for sender and receiver + * links created using the corresponding factory methods. *

- * The getOrCreate factory methods return an existing client for the given address if available. Note that - * factory methods for creating consumers always return a new instance so that all messages received are only - * processed by the handler passed in to the factory method. + * Before any links can be created, the underlying AMQP connection needs to be established. This is done by + * invoking one of the connect methods. *

- * Before any of the factory methods can be invoked successfully, the client needs to connect to Hono. This is done by - * invoking one of the client's connect methods. - *

- * An AMQP connection is established in multiple stages: + * This class represents the client side when establishing the AMQP connection: *

    - *
  1. The client establishes a TCP connection to the peer. For this to succeed, the peer must have registered a + *
  2. The client tries to establish a TCP/TLS connection to the peer. For this to succeed, the peer must have registered a * socket listener on the IP address and port that the client is configured to use.
  3. - *
  4. The client performs a SASL handshake with the peer if required by the peer. The client needs to be + *
  5. The client initiates a SASL handshake if requested by the peer. The client needs to be * configured with correct credentials in order for this stage to succeed.
  6. *
  7. Finally, the client and the peer need to agree on AMQP 1.0 specific connection parameters like capabilities * and session window size.
  8. *
* Some of the connect methods accept a {@code ProtonClientOptions} type parameter. Note that these options - * only influence the client's behavior when establishing the TCP connection with the peer. The overall behavior of + * only influence the client's behavior when establishing the TCP/TLS connection with the peer. The overall behavior of * the client regarding the establishment of the AMQP connection must be configured using the - * {@link ClientConfigProperties} passed in to the newClient method. In particular, the - * {@link ClientConfigProperties#setReconnectAttempts(int)} method can be used to specify, how many - * times the client should try to establish a connection to the peer before giving up. + * {@link ClientConfigProperties} passed in to the {@link #newConnection(Vertx, ClientConfigProperties)} method. + * In particular, the {@link ClientConfigProperties#setReconnectAttempts(int)} method can be used to specify, + * how many times the client should try to establish a connection to the peer before giving up. *

* NB When the client tries to establish a connection to the peer, it stores the current * vert.x {@code Context} in a local variable and performs all further interactions with the peer running - * on this Context. Invoking any of the methods of the client from a vert.x Context other than the one - * used for establishing the connection may cause race conditions or even deadlocks because the handlers + * on this Context. Invoking any of the connection's methods from a vert.x Context other than the one + * used for establishing the connection may cause race conditions or even deadlocks, because the handlers * registered on the {@code Future}s returned by these methods will be invoked from the stored Context. - * It is the invoking code's responsibility to either ensure that the client's methods are always invoked + * It is the invoking code's responsibility to either ensure that the connection's methods are always invoked * from the same Context or to make sure that the handlers are running on the correct Context, e.g. by using - * the {@code Context}'s runOnContext method. + * the {@link #executeOrRunOnContext(Handler)} method. */ public interface HonoConnection extends ConnectionLifecycle { @@ -115,19 +110,19 @@ static HonoConnection newConnection(final Vertx vertx, final ClientConfigPropert * {@inheritDoc} * * The connection will be established using default options. - * With the default options a client will try three times to establish a TCP connection to the peer - * before giving up. Each attempt will be canceled after 200ms and the client will wait 500ms - * before making the next attempt. Note that each connection attempt is made using the same IP - * address that has been resolved when the method was initially invoked. + *

+ * With the default options the client will try to establish a TCP connection to the peer a single + * time and then give up. *

* Once a TCP connection is established, the client performs a SASL handshake (if requested by the * peer) using the credentials set in the {@link ClientConfigProperties}. Finally, the client * opens the AMQP connection to the peer, based on the negotiated parameters. *

- * The number of times that the client should try to establish the AMQP connection with the peer + * The overall number of times that the client should try to establish the AMQP connection with the peer * can be configured by means of the connectAttempts property of the * {@code ClientConfigProperties} passed in to the {@link #newConnection(Vertx, ClientConfigProperties)} - * method. + * method. The client will perform a new DNS lookup of the peer's hostname with each attempt to + * establish the AMQP connection. *

* When an established connection to the peer fails, the client will automatically try to re-connect * to the peer using the same options and behavior as used for establishing the initial connection. @@ -160,7 +155,8 @@ static HonoConnection newConnection(final Vertx vertx, final ClientConfigPropert * The number of times that the client should try to establish the AMQP connection with the peer * can be configured by means of the connectAttempts property of the * {@code ClientConfigProperties} passed in to the {@link #newConnection(Vertx, ClientConfigProperties)} - * method. + * method. The client will perform a new DNS lookup of the peer's hostname with each attempt to + * establish the AMQP connection. *

* When an established connection to the peer fails, the client will automatically try to re-connect * to the peer using the same options and behavior as used for establishing the initial connection. @@ -193,7 +189,8 @@ static HonoConnection newConnection(final Vertx vertx, final ClientConfigPropert * The number of times that the client should try to establish the AMQP connection with the peer * can be configured by means of the connectAttempts property of the * {@code ClientConfigProperties} passed in to the {@link #newConnection(Vertx, ClientConfigProperties)} - * method. + * method. The client will perform a new DNS lookup of the peer's hostname with each attempt to + * establish the AMQP connection. *

* When an established connection to the peer fails, the given disconnect handler will be invoked. * Note that the client will not automatically try to re-connect to the peer in this case. @@ -207,7 +204,10 @@ static HonoConnection newConnection(final Vertx vertx, final ClientConfigPropert * established. * * @throws NullPointerException if the disconnect handler is {@code null}. + * @deprecated Use one of the other connect methods instead and use {@link #addDisconnectListener(DisconnectListener)} + * to be notified when the underlying AMQP connection fails unexpectedly. */ + @Deprecated(forRemoval = true, since = "1.0-M2") Future connect(Handler disconnectHandler); /** @@ -225,7 +225,8 @@ static HonoConnection newConnection(final Vertx vertx, final ClientConfigPropert * The number of times that the client should try to establish the AMQP connection with the peer * can be configured by means of the connectAttempts property of the * {@code ClientConfigProperties} passed in to the {@link #newConnection(Vertx, ClientConfigProperties)} - * method. + * method. The client will perform a new DNS lookup of the peer's hostname with each attempt to + * establish the AMQP connection. *

* When an established connection to the peer fails, the given disconnect handler will be invoked. * Note that the client will not automatically try to re-connect to the peer in this case. @@ -242,7 +243,10 @@ static HonoConnection newConnection(final Vertx vertx, final ClientConfigPropert * established, or *

  • the maximum number of (unsuccessful) (re-)connection attempts have been made.
  • * + * @deprecated Use one of the other connect methods instead and use {@link #addDisconnectListener(DisconnectListener)} + * to be notified when the underlying AMQP connection fails unexpectedly. */ + @Deprecated(forRemoval = true, since = "1.0-M2") Future connect( ProtonClientOptions options, Handler disconnectHandler); @@ -282,6 +286,14 @@ Future connect( */ void shutdown(Handler> completionHandler); + /** + * Checks if this connection is shut down. + * + * @return {@code true} if this connection is shut down already + * or is in the process of shutting down. + */ + boolean isShutdown(); + /** * Checks if this client supports a certain capability. *

    diff --git a/client/src/main/java/org/eclipse/hono/client/impl/CommandConsumerFactoryImpl.java b/client/src/main/java/org/eclipse/hono/client/impl/CommandConsumerFactoryImpl.java index ca52834aff..c26b86c948 100644 --- a/client/src/main/java/org/eclipse/hono/client/impl/CommandConsumerFactoryImpl.java +++ b/client/src/main/java/org/eclipse/hono/client/impl/CommandConsumerFactoryImpl.java @@ -23,19 +23,19 @@ import org.eclipse.hono.client.CommandConsumerFactory; import org.eclipse.hono.client.CommandContext; import org.eclipse.hono.client.CommandResponseSender; +import org.eclipse.hono.client.HonoConnection; import org.eclipse.hono.client.MessageConsumer; import org.eclipse.hono.client.ResourceConflictException; -import org.eclipse.hono.config.ClientConfigProperties; -import org.eclipse.hono.connection.ConnectionFactory; +import org.eclipse.hono.client.impl.CommandConsumer; import io.vertx.core.Future; import io.vertx.core.Handler; -import io.vertx.core.Vertx; /** - * Implements a connection between an Adapter and the AMQP 1.0 network to receive commands and send a response. + * A factory for creating clients for the AMQP 1.0 Messaging Network to + * receive commands and send responses. */ -public class CommandConsumerFactoryImpl extends HonoConnectionImpl implements CommandConsumerFactory { +public class CommandConsumerFactoryImpl extends AbstractHonoClientFactory implements CommandConsumerFactory { /** * The minimum number of milliseconds to wait between checking a @@ -43,11 +43,8 @@ public class CommandConsumerFactoryImpl extends HonoConnectionImpl implements Co */ public static final long MIN_LIVENESS_CHECK_INTERVAL_MILLIS = 2000; - /** - * The consumers that can be used to receive command messages. - * The device, which belongs to a tenant is used as the key, e.g. DEFAULT_TENANT/4711. - */ - private final Map commandConsumers = new HashMap<>(); + private final CachingClientFactory commandConsumerFactory; + /** * A mapping of command consumer addresses to vert.x timer IDs which represent the * liveness checks for the consumers. @@ -55,41 +52,22 @@ public class CommandConsumerFactoryImpl extends HonoConnectionImpl implements Co private final Map livenessChecks = new HashMap<>(); /** - * Creates a new client for a set of configuration properties. - *

    - * This constructor creates a connection factory using - * {@link ConnectionFactory#newConnectionFactory(Vertx, ClientConfigProperties)}. - * - * @param vertx The Vert.x instance to execute the client on, if {@code null} a new Vert.x instance is used. - * @param clientConfigProperties The configuration properties to use. - * @throws NullPointerException if clientConfigProperties is {@code null} + * Creates a new factory for an existing connection. + * + * @param connection The connection to use. */ - public CommandConsumerFactoryImpl(final Vertx vertx, final ClientConfigProperties clientConfigProperties) { - super(vertx, clientConfigProperties); + public CommandConsumerFactoryImpl(final HonoConnection connection) { + super(connection); + commandConsumerFactory = new CachingClientFactory<>(c -> true); } - /** - * Creates a new client for a set of configuration properties. - *

    - * This constructor creates a connection factory using - * {@link ConnectionFactory#newConnectionFactory(Vertx, ClientConfigProperties)}. - * - * @param vertx The Vert.x instance to execute the client on, if {@code null} a new Vert.x instance is used. - * @param connectionFactory Factory to invoke for a new connection. - * @param clientConfigProperties The configuration properties to use. - * @throws NullPointerException if clientConfigProperties is {@code null} - */ - public CommandConsumerFactoryImpl(final Vertx vertx, final ConnectionFactory connectionFactory, final ClientConfigProperties clientConfigProperties) { - super(vertx, connectionFactory, clientConfigProperties); + @Override + protected void onDisconnect() { + commandConsumerFactory.clearState(); } - /** - * {@inheritDoc} - */ - @Override - protected void clearState() { - super.clearState(); - commandConsumers.clear(); + private String getKey(final String tenantId, final String deviceId) { + return Device.asAddress(tenantId, deviceId); } /** @@ -99,28 +77,24 @@ protected void clearState() { public final Future createCommandConsumer( final String tenantId, final String deviceId, - final Handler commandConsumer, + final Handler commandHandler, final Handler remoteCloseHandler) { Objects.requireNonNull(tenantId); Objects.requireNonNull(deviceId); - Objects.requireNonNull(commandConsumer); + Objects.requireNonNull(commandHandler); - return executeOrRunOnContext(result -> { - final String key = Device.asAddress(tenantId, deviceId); - final MessageConsumer messageConsumer = commandConsumers.get(key); - if (messageConsumer != null) { + return connection.executeOrRunOnContext(result -> { + final String key = getKey(tenantId, deviceId); + final MessageConsumer commandConsumer = commandConsumerFactory.getClient(key); + if (commandConsumer != null) { log.debug("cannot create concurrent command consumer [tenant: {}, device-id: {}]", tenantId, deviceId); result.fail(new ResourceConflictException("message consumer already in use")); } else { - createConsumer( - tenantId, - () -> newCommandConsumer(tenantId, deviceId, commandConsumer, remoteCloseHandler)) - .map(consumer -> { - commandConsumers.put(key, consumer); - return consumer; - }) - .setHandler(result); + commandConsumerFactory.getOrCreateClient( + key, + () -> newCommandConsumer(tenantId, deviceId, commandHandler, remoteCloseHandler), + result); } }); } @@ -147,16 +121,17 @@ public final Future createCommandConsumer( throw new IllegalArgumentException("liveness check interval must be > 0"); } - return createCommandConsumer(tenantId, deviceId, commandConsumer, remoteCloseHandler).map(c -> { - - final String key = Device.asAddress(tenantId, deviceId); - final long effectiveCheckInterval = Math.max(MIN_LIVENESS_CHECK_INTERVAL_MILLIS, checkInterval); - final long livenessCheckId = vertx.setPeriodic( - effectiveCheckInterval, - newLivenessCheck(tenantId, deviceId, key, commandConsumer, remoteCloseHandler)); - livenessChecks.put(key, livenessCheckId); - return c; - }); + return createCommandConsumer(tenantId, deviceId, commandConsumer, remoteCloseHandler) + .map(c -> { + + final String key = getKey(tenantId, deviceId); + final long effectiveCheckInterval = Math.max(MIN_LIVENESS_CHECK_INTERVAL_MILLIS, checkInterval); + final long livenessCheckId = connection.getVertx().setPeriodic( + effectiveCheckInterval, + newLivenessCheck(tenantId, deviceId, key, commandConsumer, remoteCloseHandler)); + livenessChecks.put(key, livenessCheckId); + return c; + }); } Handler newLivenessCheck( @@ -168,34 +143,40 @@ Handler newLivenessCheck( final AtomicBoolean recreating = new AtomicBoolean(false); return timerId -> { - if (isShutdown()) { - vertx.cancelTimer(timerId); - } else if (isConnectedInternal() && !commandConsumers.containsKey(key)) { - // when a connection is lost unexpectedly, - // all consumers will be removed from the cache - if (recreating.compareAndSet(false, true)) { - // set a lock in order to prevent spawning multiple attempts - // to re-create the consumer - log.debug("trying to re-create command consumer [tenant: {}, device-id: {}]", - tenantId, deviceId); - // we try to re-create the link using the original parameters - // which will put the consumer into the cache again, if successful - createCommandConsumer(tenantId, deviceId, commandConsumer, remoteCloseHandler) - .map(consumer -> { - log.debug("successfully re-created command consumer [tenant: {}, device-id: {}]", - tenantId, deviceId); - return consumer; - }) - .otherwise(t -> { - log.info("failed to re-create command consumer [tenant: {}, device-id: {}]: {}", - tenantId, deviceId, t.getMessage()); - return null; - }) - .setHandler(s -> recreating.compareAndSet(true, false)); - } else { - log.debug("already trying to re-create command consumer [tenant: {}, device-id: {}], yielding ...", - tenantId, deviceId); - } + if (connection.isShutdown()) { + connection.getVertx().cancelTimer(timerId); + } else { + connection.isConnected().map(ok -> { + if (commandConsumerFactory.getClient(key) == null) { + // when a connection is lost unexpectedly, + // all consumers will have been removed from the cache + // so we need to recreate the consumer + if (recreating.compareAndSet(false, true)) { + // set a lock in order to prevent spawning multiple attempts + // to re-create the consumer + log.debug("trying to re-create command consumer [tenant: {}, device-id: {}]", + tenantId, deviceId); + // we try to re-create the link using the original parameters + // which will put the consumer into the cache again, if successful + createCommandConsumer(tenantId, deviceId, commandConsumer, remoteCloseHandler) + .map(consumer -> { + log.debug("successfully re-created command consumer [tenant: {}, device-id: {}]", + tenantId, deviceId); + return consumer; + }) + .otherwise(t -> { + log.info("failed to re-create command consumer [tenant: {}, device-id: {}]: {}", + tenantId, deviceId, t.getMessage()); + return null; + }) + .setHandler(s -> recreating.compareAndSet(true, false)); + } else { + log.debug("already trying to re-create command consumer [tenant: {}, device-id: {}], yielding ...", + tenantId, deviceId); + } + } + return null; + }); } }; } @@ -206,64 +187,64 @@ private Future newCommandConsumer( final Handler commandConsumer, final Handler remoteCloseHandler) { - return checkConnected().compose(con -> { - final String key = Device.asAddress(tenantId, deviceId); - return CommandConsumer.create( - this, + final String key = Device.asAddress(tenantId, deviceId); + return CommandConsumer.create( + connection, tenantId, deviceId, commandConsumer, sourceAddress -> { // local close hook // stop liveness check - Optional.ofNullable(livenessChecks.remove(key)).ifPresent(vertx::cancelTimer); - commandConsumers.remove(key); + Optional.ofNullable(livenessChecks.remove(key)).ifPresent(connection.getVertx()::cancelTimer); + commandConsumerFactory.removeClient(key); }, sourceAddress -> { // remote close hook - commandConsumers.remove(key); + commandConsumerFactory.removeClient(key); remoteCloseHandler.handle(null); - }) - .map(c -> (MessageConsumer) c); - }); + }).map(c -> (MessageConsumer) c); } /** * {@inheritDoc} + * + * This implementation always creates a new sender link. */ - @Deprecated @Override - public Future closeCommandConsumer(final String tenantId, final String deviceId) { + public Future getCommandResponseSender( + final String tenantId, + final String replyId) { Objects.requireNonNull(tenantId); - Objects.requireNonNull(deviceId); + Objects.requireNonNull(replyId); - return executeOrRunOnContext(result -> { - final String deviceAddress = Device.asAddress(tenantId, deviceId); - // stop liveness check - Optional.ofNullable(livenessChecks.remove(deviceAddress)).ifPresent(vertx::cancelTimer); - // close and remove link from cache - Optional.ofNullable(commandConsumers.remove(deviceAddress)).ifPresent(consumer -> { - consumer.close(result); - }); + return connection.executeOrRunOnContext(result -> { + CommandResponseSenderImpl.create( + connection, + tenantId, + replyId, + onRemoteClose -> {}) + .setHandler(result); }); } /** * {@inheritDoc} - * - * This implementation always creates a new sender link. */ + @Deprecated @Override - public Future getCommandResponseSender( - final String tenantId, - final String replyId) { + public Future closeCommandConsumer(final String tenantId, final String deviceId) { Objects.requireNonNull(tenantId); - Objects.requireNonNull(replyId); + Objects.requireNonNull(deviceId); - return executeOrRunOnContext(result -> { - checkConnected() - .compose(ok -> CommandResponseSenderImpl.create(this, tenantId, replyId, onSenderClosed -> {})) - .setHandler(result); + return connection.executeOrRunOnContext(result -> { + final String key = getKey(tenantId, deviceId); + // stop liveness check + Optional.ofNullable(livenessChecks.remove(key)).ifPresent(connection.getVertx()::cancelTimer); + // close and remove link from cache + commandConsumerFactory.removeClient(key, consumer -> { + consumer.close(result); + }); }); } } diff --git a/client/src/main/java/org/eclipse/hono/client/impl/HonoConnectionImpl.java b/client/src/main/java/org/eclipse/hono/client/impl/HonoConnectionImpl.java index b7f58d6762..41e1ccefe5 100644 --- a/client/src/main/java/org/eclipse/hono/client/impl/HonoConnectionImpl.java +++ b/client/src/main/java/org/eclipse/hono/client/impl/HonoConnectionImpl.java @@ -16,7 +16,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -24,7 +23,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Supplier; import javax.security.sasl.AuthenticationException; @@ -33,7 +31,6 @@ import org.eclipse.hono.client.ClientErrorException; import org.eclipse.hono.client.DisconnectListener; import org.eclipse.hono.client.HonoConnection; -import org.eclipse.hono.client.MessageConsumer; import org.eclipse.hono.client.ReconnectListener; import org.eclipse.hono.client.ServerErrorException; import org.eclipse.hono.client.ServiceInvocationException; @@ -63,9 +60,10 @@ import io.vertx.proton.sasl.SaslSystemException; /** - * A helper class for creating Vert.x based clients for Hono's arbitrary APIs. + * A helper class for managing a vertx-proton based AMQP connection to a + * Hono service endpoint. *

    - * The client ensures that all interactions with the peer are performed on the + * The connection ensures that all interactions with the peer are performed on the * same vert.x {@code Context}. For this purpose the connect methods * either use the current Context or create a new Context for connecting to * the peer. This same Context is then used for all consecutive interactions with @@ -99,7 +97,6 @@ public class HonoConnectionImpl implements HonoConnection { */ protected volatile Context context; - private final List> creationRequests = new ArrayList<>(); private final List disconnectListeners = new ArrayList<>(); private final List reconnectListeners = new ArrayList<>(); private final AtomicBoolean connecting = new AtomicBoolean(false); @@ -271,13 +268,8 @@ protected boolean isConnectedInternal() { return connection != null && !connection.isDisconnected(); } - /** - * Checks if this client is shut down. - * - * @return {@code true} if this client is shut down already or is - * in the process of shutting down. - */ - protected final boolean isShutdown() { + @Override + public final boolean isShutdown() { return shuttingDown.get(); } @@ -341,6 +333,7 @@ public final Future connect(final ProtonClientOptions options) { /** * {@inheritDoc} */ + @Deprecated(forRemoval = true, since = "1.0-M2") @Override public final Future connect(final Handler disconnectHandler) { return connect(null, Objects.requireNonNull(disconnectHandler)); @@ -349,6 +342,7 @@ public final Future connect(final Handler disc /** * {@inheritDoc} */ + @Deprecated(forRemoval = true, since = "1.0-M2") @Override public final Future connect( final ProtonClientOptions options, @@ -480,7 +474,6 @@ protected void clearState() { setConnection(null); - failAllCreationRequests(); notifyDisconnectHandlers(); // make sure we make configured number of attempts to re-connect connectAttempts = new AtomicInteger(0); @@ -493,14 +486,6 @@ private void notifyDisconnectHandlers() { } } - private void failAllCreationRequests() { - - for (final Iterator> iter = creationRequests.iterator(); iter.hasNext();) { - iter.next().handle(null); - iter.remove(); - } - } - private void reconnect(final Handler> connectionHandler, final Handler disconnectHandler) { reconnect(null, connectionHandler, disconnectHandler); @@ -566,47 +551,6 @@ private void failConnectionAttempt(final Throwable connectionFailureCause, final } } - /** - * Creates a new message consumer for a tenant. - * - * @param tenantId The tenant to create the consumer for. - * @param newConsumerSupplier The factory to use for creating a new - * consumer. - * @return A future indicating the outcome of the operation. The future - * will be succeeded with the created consumer or will be - * failed with a {@link ServiceInvocationException} - * if the consumer cannot be created. - */ - protected Future createConsumer( - final String tenantId, - final Supplier> newConsumerSupplier) { - - return executeOrRunOnContext(result -> createConsumer(tenantId, newConsumerSupplier, result)); - } - - private void createConsumer( - final String tenantId, - final Supplier> newConsumerSupplier, - final Future result) { - - // register a handler to be notified if the underlying connection to the server fails - // so that we can fail the returned future - final Handler connectionFailureHandler = connectionLost -> { - result.tryFail( - new ServerErrorException(HttpURLConnection.HTTP_UNAVAILABLE, "connection to server lost")); - }; - creationRequests.add(connectionFailureHandler); - - newConsumerSupplier.get().setHandler(attempt -> { - creationRequests.remove(connectionFailureHandler); - if (attempt.succeeded()) { - result.tryComplete(attempt.result()); - } else { - result.tryFail(attempt.cause()); - } - }); - } - /** * {@inheritDoc} * diff --git a/client/src/test/java/org/eclipse/hono/client/impl/CommandConsumerFactoryImplTest.java b/client/src/test/java/org/eclipse/hono/client/impl/CommandConsumerFactoryImplTest.java index d0d163aaf9..2b2134e0ee 100644 --- a/client/src/test/java/org/eclipse/hono/client/impl/CommandConsumerFactoryImplTest.java +++ b/client/src/test/java/org/eclipse/hono/client/impl/CommandConsumerFactoryImplTest.java @@ -15,6 +15,7 @@ import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; @@ -26,11 +27,12 @@ import java.net.HttpURLConnection; -import org.apache.qpid.proton.amqp.transport.AmqpError; -import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.amqp.transport.Source; import org.eclipse.hono.client.CommandContext; +import org.eclipse.hono.client.DisconnectListener; +import org.eclipse.hono.client.HonoConnection; import org.eclipse.hono.client.MessageConsumer; +import org.eclipse.hono.client.ServerErrorException; import org.eclipse.hono.client.ServiceInvocationException; import org.eclipse.hono.config.ClientConfigProperties; import org.junit.Before; @@ -40,15 +42,14 @@ import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; -import io.vertx.core.AsyncResult; import io.vertx.core.Context; import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Vertx; -import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import io.vertx.ext.unit.junit.VertxUnitRunner; -import io.vertx.proton.ProtonConnection; +import io.vertx.proton.ProtonMessageHandler; +import io.vertx.proton.ProtonQoS; import io.vertx.proton.ProtonReceiver; @@ -63,15 +64,13 @@ public class CommandConsumerFactoryImplTest { * Global timeout for each test case. */ @Rule - public Timeout timeout = Timeout.seconds(3); + public Timeout timeout = Timeout.seconds(300); private Vertx vertx; private Context context; private ClientConfigProperties props; - private ProtonConnection con; - - private CommandConsumerFactoryImpl factory; - private DisconnectHandlerProvidingConnectionFactory connectionFactory; + private HonoConnection connection; + private CommandConsumerFactoryImpl commandConnection; private ProtonReceiver receiver; /** @@ -93,11 +92,15 @@ public void setUp() { props = new ClientConfigProperties(); receiver = mock(ProtonReceiver.class); - con = mock(ProtonConnection.class); - when(con.createReceiver(anyString())).thenReturn(receiver); - - connectionFactory = new DisconnectHandlerProvidingConnectionFactory(con); - factory = new CommandConsumerFactoryImpl(vertx, connectionFactory, props); + connection = HonoClientUnitTestHelper.mockHonoConnection(vertx, props); + when(connection.createReceiver( + anyString(), + any(ProtonQoS.class), + any(ProtonMessageHandler.class), + anyInt(), + any(Handler.class))).thenReturn(Future.succeededFuture(receiver)); + + commandConnection = new CommandConsumerFactoryImpl(connection); } /** @@ -112,20 +115,17 @@ public void testCreateCommandConsumerFailsIfPeerRejectsLink(final TestContext ct final Handler commandHandler = mock(Handler.class); final Handler closeHandler = mock(Handler.class); - final Source source = mock(Source.class); - when(source.getAddress()).thenReturn(null); - when(receiver.getRemoteSource()).thenReturn(source); - - factory.connect() - .compose(c -> { - final ArgumentCaptor>> linkOpenHandler = ArgumentCaptor.forClass(Handler.class); - final Future consumer = factory.createCommandConsumer("theTenant", "theDevice", commandHandler, closeHandler); - verify(con).createReceiver("control/theTenant/theDevice"); - verify(receiver).openHandler(linkOpenHandler.capture()); - verify(receiver).open(); - linkOpenHandler.getValue().handle(Future.succeededFuture()); - return consumer; - }).setHandler(ctx.asyncAssertFailure(t -> { + final ServerErrorException ex = new ServerErrorException(HttpURLConnection.HTTP_UNAVAILABLE); + when(connection.createReceiver( + anyString(), + any(ProtonQoS.class), + any(ProtonMessageHandler.class), + anyInt(), + any(Handler.class))) + .thenReturn(Future.failedFuture(ex)); + + commandConnection.createCommandConsumer("theTenant", "theDevice", commandHandler, closeHandler) + .setHandler(ctx.asyncAssertFailure(t -> { ctx.assertEquals(HttpURLConnection.HTTP_UNAVAILABLE, ((ServiceInvocationException) t).getErrorCode()); })); } @@ -140,24 +140,11 @@ public void testCreateCommandConsumerFailsIfPeerRejectsLink(final TestContext ct @Test public void testCreateCommandConsumerSucceeds(final TestContext ctx) { - final String address = "control/theTenant/theDevice"; final Handler commandHandler = mock(Handler.class); final Handler closeHandler = mock(Handler.class); - final Source source = mock(Source.class); - when(source.getAddress()).thenReturn(address); - when(receiver.getSource()).thenReturn(source); - when(receiver.getRemoteSource()).thenReturn(source); - factory.connect() - .compose(c -> { - final ArgumentCaptor>> linkOpenHandler = ArgumentCaptor.forClass(Handler.class); - final Future consumer = factory.createCommandConsumer("theTenant", "theDevice", commandHandler, closeHandler); - verify(con).createReceiver(address); - verify(receiver).openHandler(linkOpenHandler.capture()); - verify(receiver).open(); - linkOpenHandler.getValue().handle(Future.succeededFuture(receiver)); - return consumer; - }).setHandler(ctx.asyncAssertSuccess()); + commandConnection.createCommandConsumer("theTenant", "theDevice", commandHandler, closeHandler) + .setHandler(ctx.asyncAssertSuccess()); } /** @@ -173,35 +160,21 @@ public void testCreateCommandConsumerSetsRemoteCloseHandler(final TestContext ct final String address = "control/theTenant/theDevice"; final Handler commandHandler = mock(Handler.class); final Handler closeHandler = mock(Handler.class); - final Source source = mock(Source.class); - when(source.getAddress()).thenReturn(address); - when(receiver.getSource()).thenReturn(source); - when(receiver.getRemoteSource()).thenReturn(source); - // GIVEN a command consumer for which a close handler - // has been registered - factory.connect() - .compose(c -> { - final Future consumer = factory.createCommandConsumer("theTenant", "theDevice", commandHandler, closeHandler); - verify(con).createReceiver(address); - final ArgumentCaptor>> linkOpenHandler = ArgumentCaptor.forClass(Handler.class); - verify(receiver).openHandler(linkOpenHandler.capture()); - verify(receiver).open(); - linkOpenHandler.getValue().handle(Future.succeededFuture(receiver)); - return consumer; - }).map(c -> { - final ArgumentCaptor>> remoteCloseHandler = ArgumentCaptor.forClass(Handler.class); - verify(receiver).closeHandler(remoteCloseHandler.capture()); - // WHEN the peer closes the link - remoteCloseHandler.getValue().handle(Future.succeededFuture(receiver)); - // THEN the close handler is invoked - verify(closeHandler).handle(null); - return c; - }).setHandler(ctx.asyncAssertSuccess()); + commandConnection.createCommandConsumer("theTenant", "theDevice", commandHandler, closeHandler); + final ArgumentCaptor> captor = ArgumentCaptor.forClass(Handler.class); + verify(connection).createReceiver( + eq(address), + eq(ProtonQoS.AT_LEAST_ONCE), + any(ProtonMessageHandler.class), + eq(0), + captor.capture()); + captor.getValue().handle(address); + verify(closeHandler).handle(null); } /** - * Verifies that a command consumer's close method is invoked, + * Verifies that when a command consumer's close method is invoked, * then *

      *
    • the underlying link is closed,
    • @@ -221,43 +194,35 @@ public void testLocalCloseRemovesCommandConsumerFromCache(final TestContext ctx) when(source.getAddress()).thenReturn(address); when(receiver.getSource()).thenReturn(source); when(receiver.getRemoteSource()).thenReturn(source); + when(receiver.isOpen()).thenReturn(Boolean.TRUE); when(vertx.setPeriodic(anyLong(), any(Handler.class))).thenReturn(10L); // GIVEN a command consumer - factory.connect() - .compose(client -> { - final Future consumer = factory.createCommandConsumer("theTenant", "theDevice", commandHandler, null, 5000L); - verify(con).createReceiver(address); - final ArgumentCaptor>> linkOpenHandler = ArgumentCaptor.forClass(Handler.class); - verify(receiver).closeHandler(any(Handler.class)); - verify(receiver).openHandler(linkOpenHandler.capture()); - verify(receiver).open(); - linkOpenHandler.getValue().handle(Future.succeededFuture(receiver)); - when(receiver.isOpen()).thenReturn(Boolean.TRUE); - verify(vertx).setPeriodic(eq(5000L), any(Handler.class)); - return consumer; - }).map(consumer -> { - // WHEN closing the link locally - final Future localCloseHandler = Future.future(); - consumer.close(localCloseHandler.completer()); - final ArgumentCaptor>> closeHandler = ArgumentCaptor.forClass(Handler.class); - verify(receiver, times(2)).closeHandler(closeHandler.capture()); - verify(receiver).close(); - // and the peer sends its detach frame - closeHandler.getValue().handle(Future.succeededFuture(receiver)); - return null; - }).map(ok -> { - // THEN the liveness check is canceled - verify(vertx).cancelTimer(10L); - // and the next attempt to create a command consumer for the same address - final Future newConsumer = factory.createCommandConsumer("theTenant", "theDevice", commandHandler, null); - // results in a new link to be opened - verify(con, times(2)).createReceiver(address); - final ArgumentCaptor>> linkOpenHandler = ArgumentCaptor.forClass(Handler.class); - verify(receiver, times(2)).openHandler(linkOpenHandler.capture()); - linkOpenHandler.getValue().handle(Future.succeededFuture(receiver)); - return newConsumer; - }).setHandler(ctx.asyncAssertSuccess()); + commandConnection.createCommandConsumer("theTenant", "theDevice", commandHandler, null, 5000L) + .map(consumer -> { + verify(vertx).setPeriodic(eq(5000L), any(Handler.class)); + // WHEN closing the link locally + final Future localCloseHandler = Future.future(); + consumer.close(localCloseHandler); + final ArgumentCaptor> closeHandler = ArgumentCaptor.forClass(Handler.class); + verify(connection).closeAndFree(eq(receiver), closeHandler.capture()); + // and the peer sends its detach frame + closeHandler.getValue().handle(null); + return null; + }).map(ok -> { + // THEN the liveness check is canceled + verify(vertx).cancelTimer(10L); + // and the next attempt to create a command consumer for the same address + final Future newConsumer = commandConnection.createCommandConsumer("theTenant", "theDevice", commandHandler, null); + // results in a new link to be opened + verify(connection, times(2)).createReceiver( + eq(address), + eq(ProtonQoS.AT_LEAST_ONCE), + any(ProtonMessageHandler.class), + eq(0), + any(Handler.class)); + return newConsumer; + }).setHandler(ctx.asyncAssertSuccess()); } /** @@ -278,49 +243,42 @@ public void testConsumerIsRecreatedOnConnectionFailure(final TestContext ctx) { when(receiver.getSource()).thenReturn(source); when(receiver.getRemoteSource()).thenReturn(source); when(vertx.setPeriodic(anyLong(), any(Handler.class))).thenReturn(10L); + doAnswer(invocation -> { + final Handler handler = invocation.getArgument(1); + handler.handle(null); + return null; + }).when(connection).closeAndFree(any(), any(Handler.class)); // GIVEN a command connection with an established command consumer // which is checked periodically for liveness - factory.connect().setHandler(ctx.asyncAssertSuccess()); - assertTrue(connectionFactory.await()); - connectionFactory.setExpectedSucceedingConnectionAttempts(1); // intentionally using a check interval that is smaller than the minimum final long livenessCheckInterval = CommandConsumerFactoryImpl.MIN_LIVENESS_CHECK_INTERVAL_MILLIS - 1; - final Async consumerCreation = ctx.async(); - final Future commandConsumer = factory.createCommandConsumer( - "theTenant", "theDevice", commandHandler, closeHandler, eq(livenessCheckInterval)); - commandConsumer.setHandler(ctx.asyncAssertSuccess(ok -> consumerCreation.complete())); - verify(con).createReceiver(address); - final ArgumentCaptor>> linkOpenHandler = ArgumentCaptor.forClass(Handler.class); - verify(receiver).openHandler(linkOpenHandler.capture()); - verify(receiver).open(); - linkOpenHandler.getValue().handle(Future.succeededFuture(receiver)); + final Future commandConsumer = commandConnection.createCommandConsumer( + "theTenant", "theDevice", commandHandler, closeHandler, livenessCheckInterval); + assertTrue(commandConsumer.isComplete()); final ArgumentCaptor> livenessCheck = ArgumentCaptor.forClass(Handler.class); // the liveness check is registered with the minimum interval length verify(vertx).setPeriodic(eq(CommandConsumerFactoryImpl.MIN_LIVENESS_CHECK_INTERVAL_MILLIS), livenessCheck.capture()); - consumerCreation.await(); // WHEN the command connection fails - connectionFactory.getDisconnectHandler().handle(con); + final ArgumentCaptor disconnectListener = ArgumentCaptor.forClass(DisconnectListener.class); + verify(connection).addDisconnectListener(disconnectListener.capture()); + disconnectListener.getValue().onDisconnect(connection); // THEN the connection is re-established - assertTrue(connectionFactory.await()); + when(connection.isConnected()).thenReturn(Future.succeededFuture()); // and the liveness check re-creates the command consumer - final Async consumerRecreation = ctx.async(); - when(receiver.open()).thenAnswer(invocation -> { - consumerRecreation.complete(); - return receiver; - }); livenessCheck.getValue().handle(10L); - verify(con, times(2)).createReceiver(address); - verify(receiver, times(2)).openHandler(any(Handler.class)); - verify(receiver, times(2)).open(); - consumerRecreation.await(); - - // and when the consumer is finally closed - final Future localCloseHandler = mock(Future.class); - commandConsumer.result().close(localCloseHandler); + verify(connection, times(2)).createReceiver( + eq(address), + eq(ProtonQoS.AT_LEAST_ONCE), + any(ProtonMessageHandler.class), + eq(0), + any(Handler.class)); + + // and when the consumer is finally closed locally + commandConsumer.result().close(null); // then the liveness check has been canceled verify(vertx).cancelTimer(10L); } @@ -340,13 +298,15 @@ public void testLivenessCheckLocksRecreationAttempt(final TestContext ctx) { final String address = "control/theTenant/theDevice"; final Handler commandHandler = mock(Handler.class); final Handler remoteCloseHandler = mock(Handler.class); - - // GIVEN an established command connection - factory.connect().setHandler(ctx.asyncAssertSuccess()); - assertTrue(connectionFactory.await()); - connectionFactory.setExpectedSucceedingConnectionAttempts(1); - - final Handler livenessCheck = factory.newLivenessCheck("theTenant", "theDevice", "key", commandHandler, remoteCloseHandler); + final Handler livenessCheck = commandConnection.newLivenessCheck("theTenant", "theDevice", "key", commandHandler, remoteCloseHandler); + final Future createdReceiver = Future.future(); + when(connection.isConnected()).thenReturn(Future.succeededFuture()); + when(connection.createReceiver( + eq(address), + eq(ProtonQoS.AT_LEAST_ONCE), + any(ProtonMessageHandler.class), + anyInt(), + any(Handler.class))).thenReturn(createdReceiver); // WHEN the liveness check fires livenessCheck.handle(10L); @@ -354,17 +314,24 @@ public void testLivenessCheckLocksRecreationAttempt(final TestContext ctx) { livenessCheck.handle(10L); // THEN only one attempt has been made to recreate the consumer link - verify(con, times(1)).createReceiver(address); + verify(connection, times(1)).createReceiver( + eq(address), + eq(ProtonQoS.AT_LEAST_ONCE), + any(ProtonMessageHandler.class), + eq(0), + any(Handler.class)); // and when the first attempt has finally timed out - when(receiver.getRemoteCondition()).thenReturn(new ErrorCondition(AmqpError.INTERNAL_ERROR, "internal error")); - final ArgumentCaptor>> linkOpenHandler = ArgumentCaptor.forClass(Handler.class); - verify(receiver).openHandler(linkOpenHandler.capture()); - linkOpenHandler.getValue().handle(Future.failedFuture("internal error")); + createdReceiver.fail(new ServerErrorException(HttpURLConnection.HTTP_UNAVAILABLE)); // then the next run of the liveness check livenessCheck.handle(10L); // will start a new attempt to re-create the consumer link - verify(con, times(2)).createReceiver(address); + verify(connection, times(2)).createReceiver( + eq(address), + eq(ProtonQoS.AT_LEAST_ONCE), + any(ProtonMessageHandler.class), + eq(0), + any(Handler.class)); } } diff --git a/client/src/test/java/org/eclipse/hono/client/impl/HonoConnectionImplTest.java b/client/src/test/java/org/eclipse/hono/client/impl/HonoConnectionImplTest.java index 23e57dd7c2..55af5c52fa 100644 --- a/client/src/test/java/org/eclipse/hono/client/impl/HonoConnectionImplTest.java +++ b/client/src/test/java/org/eclipse/hono/client/impl/HonoConnectionImplTest.java @@ -244,40 +244,6 @@ public void testConnectFailsWithClientErrorForPermanentSaslSystemException(final assertTrue(connectionFactory.awaitFailure()); } - /** - * Verifies that a request to create a consumer is failed immediately when the - * underlying connection to the server fails. - * - * @param ctx The vert.x test context. - */ - @Test - public void testCreateConsumerFailsOnConnectionFailure(final TestContext ctx) { - - // GIVEN a connected client that already tries to create a telemetry sender for "tenant" - final Async connected = ctx.async(); - honoConnection.connect(new ProtonClientOptions()).setHandler(ctx.asyncAssertSuccess(ok -> connected.complete())); - connected.await(); - - final Async creationFailure = ctx.async(); - final Async supplierInvocation = ctx.async(); - honoConnection.createConsumer("tenant", () -> { - supplierInvocation.complete(); - return Future.future(); - }).setHandler(ctx.asyncAssertFailure(cause -> { - creationFailure.complete(); - })); - // wait until the consumer supplier has been invoked - // so that we can be sure that the disconnect handler for - // for the creation request has been registered - supplierInvocation.await(); - - // WHEN the underlying connection fails - connectionFactory.getDisconnectHandler().handle(con); - - // THEN all creation requests are failed - creationFailure.await(); - } - /** * Verifies that the client tries to re-establish a lost connection to a server. * diff --git a/service-base/src/main/java/org/eclipse/hono/service/AbstractAdapterConfig.java b/service-base/src/main/java/org/eclipse/hono/service/AbstractAdapterConfig.java index ab6c68cf8d..4eb94e71e9 100644 --- a/service-base/src/main/java/org/eclipse/hono/service/AbstractAdapterConfig.java +++ b/service-base/src/main/java/org/eclipse/hono/service/AbstractAdapterConfig.java @@ -23,7 +23,6 @@ import org.eclipse.hono.client.RegistrationClientFactory; import org.eclipse.hono.client.RequestResponseClientConfigProperties; import org.eclipse.hono.client.TenantClientFactory; -import org.eclipse.hono.client.impl.CommandConsumerFactoryImpl; import org.eclipse.hono.config.ApplicationConfigProperties; import org.eclipse.hono.config.ClientConfigProperties; import org.eclipse.hono.config.VertxProperties; @@ -299,7 +298,7 @@ public ClientConfigProperties commandConsumerFactoryConfig() { @Bean @Scope("prototype") public CommandConsumerFactory commandConsumerFactory() { - return new CommandConsumerFactoryImpl(vertx(), commandConsumerFactoryConfig()); + return CommandConsumerFactory.create(HonoConnection.newConnection(vertx(), commandConsumerFactoryConfig())); } /** diff --git a/site/content/release-notes.md b/site/content/release-notes.md index fa3cc730d0..a03ef8a221 100644 --- a/site/content/release-notes.md +++ b/site/content/release-notes.md @@ -25,12 +25,33 @@ title = "Release Notes" ### API Changes -* Several changes have been made to the `org.eclipse.hono.client.MessageSender` interface: +* The `hono-client` module has undergone several major and incompatible changes. The most + important change affects the `HonoClient` interface which no longer serves as a factory + for the arbitrary clients to the Hono service endpoints. + It has been renamed to `HonoConnection` and now only represents the underlying + AMQP connection to a peer and provides methods for managing the connection state + and registering listeners for arbitrary life-cycle events of the connection. + In addition to this, several factory interfaces have been added which can be used + to create specific clients to Hono's arbitrary services. All of the former `HonoClient` + interface's factory methods have been distributed accordingly to: + * `org.eclipse.hono.client.ApplicationClientFactory` for creating clients to + Hono's north bound Telemetry, Event and Command & Control API. + * `org.eclipse.hono.client.DownstreamSenderFactory` for creating clients to + Hono's south bound Telemetry and Event APIs. + * `org.eclipse.hono.client.CommandConsumerFactory` for creating clients to + Hono's south bound Command & Control API. + * `org.eclipse.hono.client.TenantClientFactory` for creating clients to + Hono's Tenant API. + * `org.eclipse.hono.client.RegistrationClientFactory` for creating clients to + Hono's Device Registration API. + * `org.eclipse.hono.client.CredentialsClientFactory` for creating clients to + Hono's Credentials API. +* In this context the `org.eclipse.hono.client.MessageSender` interface has been changed as follows: * The *send* methods have been changed to no longer accept a *registration assertion token* which became obsolete with the removal of the *Hono Messaging* component. * The *isRegistrationAssertionRequired* method has been removed from the interface. * All *send* method variants which accept specific message parameters have been moved into - the new `org.eclipse.hono.client.DownstreamSender` interface which extends + the new `org.eclipse.hono.client.DownstreamSender` interface which extends the existing `MessageSender`. * Several changes have been made to the `org.eclipse.hono.service.AbstractProtocolAdapterBase` class: @@ -82,6 +103,12 @@ title = "Release Notes" have been deprecated. They will be removed from Hono 1.0 altogether. A new HTTP based API will be defined instead which can then be used to *manage* the content of a device registry. +* `org.eclipse.hono.client.HonoConnection`'s *connect* method variants accepting + a disconnect handler have been deprecated and will be removed in Hono 1.0. + Client code should use one of the other *connect* methods instead and register a + `org.eclipse.hono.client.DisconnectListener` and/or a + `org.eclipse.hono.client.ReconnectListener` to get notified about life-cycle + events of the underlying AMQP connection. ## 1.0-M1