diff --git a/client/src/main/java/org/eclipse/hono/client/CommandConsumerFactory.java b/client/src/main/java/org/eclipse/hono/client/CommandConsumerFactory.java
index dc71c6a179..4c9b4a3b5c 100644
--- a/client/src/main/java/org/eclipse/hono/client/CommandConsumerFactory.java
+++ b/client/src/main/java/org/eclipse/hono/client/CommandConsumerFactory.java
@@ -14,6 +14,7 @@
package org.eclipse.hono.client;
import org.eclipse.hono.client.impl.CommandConsumer;
+import org.eclipse.hono.client.impl.CommandConsumerFactoryImpl;
import io.vertx.core.Future;
import io.vertx.core.Handler;
@@ -24,6 +25,17 @@
*/
public interface CommandConsumerFactory extends ConnectionLifecycle {
+ /**
+ * Creates a new factory for an existing connection.
+ *
+ * @param connection The connection to use.
+ * @return The factory.
+ * @throws NullPointerException if connection is {@code null}
+ */
+ static CommandConsumerFactory create(final HonoConnection connection) {
+ return new CommandConsumerFactoryImpl(connection);
+ }
+
/**
* Creates a command consumer for a device.
*
diff --git a/client/src/main/java/org/eclipse/hono/client/HonoConnection.java b/client/src/main/java/org/eclipse/hono/client/HonoConnection.java
index 0d5ddab64e..6de20009a3 100644
--- a/client/src/main/java/org/eclipse/hono/client/HonoConnection.java
+++ b/client/src/main/java/org/eclipse/hono/client/HonoConnection.java
@@ -31,43 +31,38 @@
import io.vertx.proton.ProtonSender;
/**
- * A factory for creating clients for Hono's arbitrary APIs.
+ * A wrapper around a single AMQP 1.0 connection and a single session to a Hono service endpoint.
*
- * A factory maintains a single AMQP 1.0 connection and a single session to the peer. This session is shared by all AMQP
- * 1.0 links established for senders, consumers and clients created using the corresponding
- * factory methods.
+ * The session is shared by all AMQP 1.0 links established for sender and receiver
+ * links created using the corresponding factory methods.
*
- * The getOrCreate factory methods return an existing client for the given address if available. Note that
- * factory methods for creating consumers always return a new instance so that all messages received are only
- * processed by the handler passed in to the factory method.
+ * Before any links can be created, the underlying AMQP connection needs to be established. This is done by
+ * invoking one of the connect methods.
*
- * Before any of the factory methods can be invoked successfully, the client needs to connect to Hono. This is done by
- * invoking one of the client's connect methods.
- *
- * An AMQP connection is established in multiple stages:
+ * This class represents the client side when establishing the AMQP connection:
*
- * - The client establishes a TCP connection to the peer. For this to succeed, the peer must have registered a
+ *
- The client tries to establish a TCP/TLS connection to the peer. For this to succeed, the peer must have registered a
* socket listener on the IP address and port that the client is configured to use.
- * - The client performs a SASL handshake with the peer if required by the peer. The client needs to be
+ *
- The client initiates a SASL handshake if requested by the peer. The client needs to be
* configured with correct credentials in order for this stage to succeed.
* - Finally, the client and the peer need to agree on AMQP 1.0 specific connection parameters like capabilities
* and session window size.
*
* Some of the connect methods accept a {@code ProtonClientOptions} type parameter. Note that these options
- * only influence the client's behavior when establishing the TCP connection with the peer. The overall behavior of
+ * only influence the client's behavior when establishing the TCP/TLS connection with the peer. The overall behavior of
* the client regarding the establishment of the AMQP connection must be configured using the
- * {@link ClientConfigProperties} passed in to the newClient method. In particular, the
- * {@link ClientConfigProperties#setReconnectAttempts(int)} method can be used to specify, how many
- * times the client should try to establish a connection to the peer before giving up.
+ * {@link ClientConfigProperties} passed in to the {@link #newConnection(Vertx, ClientConfigProperties)} method.
+ * In particular, the {@link ClientConfigProperties#setReconnectAttempts(int)} method can be used to specify,
+ * how many times the client should try to establish a connection to the peer before giving up.
*
* NB When the client tries to establish a connection to the peer, it stores the current
* vert.x {@code Context} in a local variable and performs all further interactions with the peer running
- * on this Context. Invoking any of the methods of the client from a vert.x Context other than the one
- * used for establishing the connection may cause race conditions or even deadlocks because the handlers
+ * on this Context. Invoking any of the connection's methods from a vert.x Context other than the one
+ * used for establishing the connection may cause race conditions or even deadlocks, because the handlers
* registered on the {@code Future}s returned by these methods will be invoked from the stored Context.
- * It is the invoking code's responsibility to either ensure that the client's methods are always invoked
+ * It is the invoking code's responsibility to either ensure that the connection's methods are always invoked
* from the same Context or to make sure that the handlers are running on the correct Context, e.g. by using
- * the {@code Context}'s runOnContext method.
+ * the {@link #executeOrRunOnContext(Handler)} method.
*/
public interface HonoConnection extends ConnectionLifecycle {
@@ -115,19 +110,19 @@ static HonoConnection newConnection(final Vertx vertx, final ClientConfigPropert
* {@inheritDoc}
*
* The connection will be established using default options.
- * With the default options a client will try three times to establish a TCP connection to the peer
- * before giving up. Each attempt will be canceled after 200ms and the client will wait 500ms
- * before making the next attempt. Note that each connection attempt is made using the same IP
- * address that has been resolved when the method was initially invoked.
+ *
+ * With the default options the client will try to establish a TCP connection to the peer a single
+ * time and then give up.
*
* Once a TCP connection is established, the client performs a SASL handshake (if requested by the
* peer) using the credentials set in the {@link ClientConfigProperties}. Finally, the client
* opens the AMQP connection to the peer, based on the negotiated parameters.
*
- * The number of times that the client should try to establish the AMQP connection with the peer
+ * The overall number of times that the client should try to establish the AMQP connection with the peer
* can be configured by means of the connectAttempts property of the
* {@code ClientConfigProperties} passed in to the {@link #newConnection(Vertx, ClientConfigProperties)}
- * method.
+ * method. The client will perform a new DNS lookup of the peer's hostname with each attempt to
+ * establish the AMQP connection.
*
* When an established connection to the peer fails, the client will automatically try to re-connect
* to the peer using the same options and behavior as used for establishing the initial connection.
@@ -160,7 +155,8 @@ static HonoConnection newConnection(final Vertx vertx, final ClientConfigPropert
* The number of times that the client should try to establish the AMQP connection with the peer
* can be configured by means of the connectAttempts property of the
* {@code ClientConfigProperties} passed in to the {@link #newConnection(Vertx, ClientConfigProperties)}
- * method.
+ * method. The client will perform a new DNS lookup of the peer's hostname with each attempt to
+ * establish the AMQP connection.
*
* When an established connection to the peer fails, the client will automatically try to re-connect
* to the peer using the same options and behavior as used for establishing the initial connection.
@@ -193,7 +189,8 @@ static HonoConnection newConnection(final Vertx vertx, final ClientConfigPropert
* The number of times that the client should try to establish the AMQP connection with the peer
* can be configured by means of the connectAttempts property of the
* {@code ClientConfigProperties} passed in to the {@link #newConnection(Vertx, ClientConfigProperties)}
- * method.
+ * method. The client will perform a new DNS lookup of the peer's hostname with each attempt to
+ * establish the AMQP connection.
*
* When an established connection to the peer fails, the given disconnect handler will be invoked.
* Note that the client will not automatically try to re-connect to the peer in this case.
@@ -207,7 +204,10 @@ static HonoConnection newConnection(final Vertx vertx, final ClientConfigPropert
* established.
*
* @throws NullPointerException if the disconnect handler is {@code null}.
+ * @deprecated Use one of the other connect methods instead and use {@link #addDisconnectListener(DisconnectListener)}
+ * to be notified when the underlying AMQP connection fails unexpectedly.
*/
+ @Deprecated(forRemoval = true, since = "1.0-M2")
Future connect(Handler disconnectHandler);
/**
@@ -225,7 +225,8 @@ static HonoConnection newConnection(final Vertx vertx, final ClientConfigPropert
* The number of times that the client should try to establish the AMQP connection with the peer
* can be configured by means of the connectAttempts property of the
* {@code ClientConfigProperties} passed in to the {@link #newConnection(Vertx, ClientConfigProperties)}
- * method.
+ * method. The client will perform a new DNS lookup of the peer's hostname with each attempt to
+ * establish the AMQP connection.
*
* When an established connection to the peer fails, the given disconnect handler will be invoked.
* Note that the client will not automatically try to re-connect to the peer in this case.
@@ -242,7 +243,10 @@ static HonoConnection newConnection(final Vertx vertx, final ClientConfigPropert
* established, or
*
the maximum number of (unsuccessful) (re-)connection attempts have been made.
*
+ * @deprecated Use one of the other connect methods instead and use {@link #addDisconnectListener(DisconnectListener)}
+ * to be notified when the underlying AMQP connection fails unexpectedly.
*/
+ @Deprecated(forRemoval = true, since = "1.0-M2")
Future connect(
ProtonClientOptions options,
Handler disconnectHandler);
@@ -282,6 +286,14 @@ Future connect(
*/
void shutdown(Handler> completionHandler);
+ /**
+ * Checks if this connection is shut down.
+ *
+ * @return {@code true} if this connection is shut down already
+ * or is in the process of shutting down.
+ */
+ boolean isShutdown();
+
/**
* Checks if this client supports a certain capability.
*
diff --git a/client/src/main/java/org/eclipse/hono/client/impl/CommandConsumerFactoryImpl.java b/client/src/main/java/org/eclipse/hono/client/impl/CommandConsumerFactoryImpl.java
index ca52834aff..c26b86c948 100644
--- a/client/src/main/java/org/eclipse/hono/client/impl/CommandConsumerFactoryImpl.java
+++ b/client/src/main/java/org/eclipse/hono/client/impl/CommandConsumerFactoryImpl.java
@@ -23,19 +23,19 @@
import org.eclipse.hono.client.CommandConsumerFactory;
import org.eclipse.hono.client.CommandContext;
import org.eclipse.hono.client.CommandResponseSender;
+import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.client.MessageConsumer;
import org.eclipse.hono.client.ResourceConflictException;
-import org.eclipse.hono.config.ClientConfigProperties;
-import org.eclipse.hono.connection.ConnectionFactory;
+import org.eclipse.hono.client.impl.CommandConsumer;
import io.vertx.core.Future;
import io.vertx.core.Handler;
-import io.vertx.core.Vertx;
/**
- * Implements a connection between an Adapter and the AMQP 1.0 network to receive commands and send a response.
+ * A factory for creating clients for the AMQP 1.0 Messaging Network to
+ * receive commands and send responses.
*/
-public class CommandConsumerFactoryImpl extends HonoConnectionImpl implements CommandConsumerFactory {
+public class CommandConsumerFactoryImpl extends AbstractHonoClientFactory implements CommandConsumerFactory {
/**
* The minimum number of milliseconds to wait between checking a
@@ -43,11 +43,8 @@ public class CommandConsumerFactoryImpl extends HonoConnectionImpl implements Co
*/
public static final long MIN_LIVENESS_CHECK_INTERVAL_MILLIS = 2000;
- /**
- * The consumers that can be used to receive command messages.
- * The device, which belongs to a tenant is used as the key, e.g. DEFAULT_TENANT/4711.
- */
- private final Map commandConsumers = new HashMap<>();
+ private final CachingClientFactory commandConsumerFactory;
+
/**
* A mapping of command consumer addresses to vert.x timer IDs which represent the
* liveness checks for the consumers.
@@ -55,41 +52,22 @@ public class CommandConsumerFactoryImpl extends HonoConnectionImpl implements Co
private final Map livenessChecks = new HashMap<>();
/**
- * Creates a new client for a set of configuration properties.
- *
- * This constructor creates a connection factory using
- * {@link ConnectionFactory#newConnectionFactory(Vertx, ClientConfigProperties)}.
- *
- * @param vertx The Vert.x instance to execute the client on, if {@code null} a new Vert.x instance is used.
- * @param clientConfigProperties The configuration properties to use.
- * @throws NullPointerException if clientConfigProperties is {@code null}
+ * Creates a new factory for an existing connection.
+ *
+ * @param connection The connection to use.
*/
- public CommandConsumerFactoryImpl(final Vertx vertx, final ClientConfigProperties clientConfigProperties) {
- super(vertx, clientConfigProperties);
+ public CommandConsumerFactoryImpl(final HonoConnection connection) {
+ super(connection);
+ commandConsumerFactory = new CachingClientFactory<>(c -> true);
}
- /**
- * Creates a new client for a set of configuration properties.
- *
- * This constructor creates a connection factory using
- * {@link ConnectionFactory#newConnectionFactory(Vertx, ClientConfigProperties)}.
- *
- * @param vertx The Vert.x instance to execute the client on, if {@code null} a new Vert.x instance is used.
- * @param connectionFactory Factory to invoke for a new connection.
- * @param clientConfigProperties The configuration properties to use.
- * @throws NullPointerException if clientConfigProperties is {@code null}
- */
- public CommandConsumerFactoryImpl(final Vertx vertx, final ConnectionFactory connectionFactory, final ClientConfigProperties clientConfigProperties) {
- super(vertx, connectionFactory, clientConfigProperties);
+ @Override
+ protected void onDisconnect() {
+ commandConsumerFactory.clearState();
}
- /**
- * {@inheritDoc}
- */
- @Override
- protected void clearState() {
- super.clearState();
- commandConsumers.clear();
+ private String getKey(final String tenantId, final String deviceId) {
+ return Device.asAddress(tenantId, deviceId);
}
/**
@@ -99,28 +77,24 @@ protected void clearState() {
public final Future createCommandConsumer(
final String tenantId,
final String deviceId,
- final Handler commandConsumer,
+ final Handler commandHandler,
final Handler remoteCloseHandler) {
Objects.requireNonNull(tenantId);
Objects.requireNonNull(deviceId);
- Objects.requireNonNull(commandConsumer);
+ Objects.requireNonNull(commandHandler);
- return executeOrRunOnContext(result -> {
- final String key = Device.asAddress(tenantId, deviceId);
- final MessageConsumer messageConsumer = commandConsumers.get(key);
- if (messageConsumer != null) {
+ return connection.executeOrRunOnContext(result -> {
+ final String key = getKey(tenantId, deviceId);
+ final MessageConsumer commandConsumer = commandConsumerFactory.getClient(key);
+ if (commandConsumer != null) {
log.debug("cannot create concurrent command consumer [tenant: {}, device-id: {}]", tenantId, deviceId);
result.fail(new ResourceConflictException("message consumer already in use"));
} else {
- createConsumer(
- tenantId,
- () -> newCommandConsumer(tenantId, deviceId, commandConsumer, remoteCloseHandler))
- .map(consumer -> {
- commandConsumers.put(key, consumer);
- return consumer;
- })
- .setHandler(result);
+ commandConsumerFactory.getOrCreateClient(
+ key,
+ () -> newCommandConsumer(tenantId, deviceId, commandHandler, remoteCloseHandler),
+ result);
}
});
}
@@ -147,16 +121,17 @@ public final Future createCommandConsumer(
throw new IllegalArgumentException("liveness check interval must be > 0");
}
- return createCommandConsumer(tenantId, deviceId, commandConsumer, remoteCloseHandler).map(c -> {
-
- final String key = Device.asAddress(tenantId, deviceId);
- final long effectiveCheckInterval = Math.max(MIN_LIVENESS_CHECK_INTERVAL_MILLIS, checkInterval);
- final long livenessCheckId = vertx.setPeriodic(
- effectiveCheckInterval,
- newLivenessCheck(tenantId, deviceId, key, commandConsumer, remoteCloseHandler));
- livenessChecks.put(key, livenessCheckId);
- return c;
- });
+ return createCommandConsumer(tenantId, deviceId, commandConsumer, remoteCloseHandler)
+ .map(c -> {
+
+ final String key = getKey(tenantId, deviceId);
+ final long effectiveCheckInterval = Math.max(MIN_LIVENESS_CHECK_INTERVAL_MILLIS, checkInterval);
+ final long livenessCheckId = connection.getVertx().setPeriodic(
+ effectiveCheckInterval,
+ newLivenessCheck(tenantId, deviceId, key, commandConsumer, remoteCloseHandler));
+ livenessChecks.put(key, livenessCheckId);
+ return c;
+ });
}
Handler newLivenessCheck(
@@ -168,34 +143,40 @@ Handler newLivenessCheck(
final AtomicBoolean recreating = new AtomicBoolean(false);
return timerId -> {
- if (isShutdown()) {
- vertx.cancelTimer(timerId);
- } else if (isConnectedInternal() && !commandConsumers.containsKey(key)) {
- // when a connection is lost unexpectedly,
- // all consumers will be removed from the cache
- if (recreating.compareAndSet(false, true)) {
- // set a lock in order to prevent spawning multiple attempts
- // to re-create the consumer
- log.debug("trying to re-create command consumer [tenant: {}, device-id: {}]",
- tenantId, deviceId);
- // we try to re-create the link using the original parameters
- // which will put the consumer into the cache again, if successful
- createCommandConsumer(tenantId, deviceId, commandConsumer, remoteCloseHandler)
- .map(consumer -> {
- log.debug("successfully re-created command consumer [tenant: {}, device-id: {}]",
- tenantId, deviceId);
- return consumer;
- })
- .otherwise(t -> {
- log.info("failed to re-create command consumer [tenant: {}, device-id: {}]: {}",
- tenantId, deviceId, t.getMessage());
- return null;
- })
- .setHandler(s -> recreating.compareAndSet(true, false));
- } else {
- log.debug("already trying to re-create command consumer [tenant: {}, device-id: {}], yielding ...",
- tenantId, deviceId);
- }
+ if (connection.isShutdown()) {
+ connection.getVertx().cancelTimer(timerId);
+ } else {
+ connection.isConnected().map(ok -> {
+ if (commandConsumerFactory.getClient(key) == null) {
+ // when a connection is lost unexpectedly,
+ // all consumers will have been removed from the cache
+ // so we need to recreate the consumer
+ if (recreating.compareAndSet(false, true)) {
+ // set a lock in order to prevent spawning multiple attempts
+ // to re-create the consumer
+ log.debug("trying to re-create command consumer [tenant: {}, device-id: {}]",
+ tenantId, deviceId);
+ // we try to re-create the link using the original parameters
+ // which will put the consumer into the cache again, if successful
+ createCommandConsumer(tenantId, deviceId, commandConsumer, remoteCloseHandler)
+ .map(consumer -> {
+ log.debug("successfully re-created command consumer [tenant: {}, device-id: {}]",
+ tenantId, deviceId);
+ return consumer;
+ })
+ .otherwise(t -> {
+ log.info("failed to re-create command consumer [tenant: {}, device-id: {}]: {}",
+ tenantId, deviceId, t.getMessage());
+ return null;
+ })
+ .setHandler(s -> recreating.compareAndSet(true, false));
+ } else {
+ log.debug("already trying to re-create command consumer [tenant: {}, device-id: {}], yielding ...",
+ tenantId, deviceId);
+ }
+ }
+ return null;
+ });
}
};
}
@@ -206,64 +187,64 @@ private Future newCommandConsumer(
final Handler commandConsumer,
final Handler remoteCloseHandler) {
- return checkConnected().compose(con -> {
- final String key = Device.asAddress(tenantId, deviceId);
- return CommandConsumer.create(
- this,
+ final String key = Device.asAddress(tenantId, deviceId);
+ return CommandConsumer.create(
+ connection,
tenantId,
deviceId,
commandConsumer,
sourceAddress -> { // local close hook
// stop liveness check
- Optional.ofNullable(livenessChecks.remove(key)).ifPresent(vertx::cancelTimer);
- commandConsumers.remove(key);
+ Optional.ofNullable(livenessChecks.remove(key)).ifPresent(connection.getVertx()::cancelTimer);
+ commandConsumerFactory.removeClient(key);
},
sourceAddress -> { // remote close hook
- commandConsumers.remove(key);
+ commandConsumerFactory.removeClient(key);
remoteCloseHandler.handle(null);
- })
- .map(c -> (MessageConsumer) c);
- });
+ }).map(c -> (MessageConsumer) c);
}
/**
* {@inheritDoc}
+ *
+ * This implementation always creates a new sender link.
*/
- @Deprecated
@Override
- public Future closeCommandConsumer(final String tenantId, final String deviceId) {
+ public Future getCommandResponseSender(
+ final String tenantId,
+ final String replyId) {
Objects.requireNonNull(tenantId);
- Objects.requireNonNull(deviceId);
+ Objects.requireNonNull(replyId);
- return executeOrRunOnContext(result -> {
- final String deviceAddress = Device.asAddress(tenantId, deviceId);
- // stop liveness check
- Optional.ofNullable(livenessChecks.remove(deviceAddress)).ifPresent(vertx::cancelTimer);
- // close and remove link from cache
- Optional.ofNullable(commandConsumers.remove(deviceAddress)).ifPresent(consumer -> {
- consumer.close(result);
- });
+ return connection.executeOrRunOnContext(result -> {
+ CommandResponseSenderImpl.create(
+ connection,
+ tenantId,
+ replyId,
+ onRemoteClose -> {})
+ .setHandler(result);
});
}
/**
* {@inheritDoc}
- *
- * This implementation always creates a new sender link.
*/
+ @Deprecated
@Override
- public Future getCommandResponseSender(
- final String tenantId,
- final String replyId) {
+ public Future closeCommandConsumer(final String tenantId, final String deviceId) {
Objects.requireNonNull(tenantId);
- Objects.requireNonNull(replyId);
+ Objects.requireNonNull(deviceId);
- return executeOrRunOnContext(result -> {
- checkConnected()
- .compose(ok -> CommandResponseSenderImpl.create(this, tenantId, replyId, onSenderClosed -> {}))
- .setHandler(result);
+ return connection.executeOrRunOnContext(result -> {
+ final String key = getKey(tenantId, deviceId);
+ // stop liveness check
+ Optional.ofNullable(livenessChecks.remove(key)).ifPresent(connection.getVertx()::cancelTimer);
+ // close and remove link from cache
+ commandConsumerFactory.removeClient(key, consumer -> {
+ consumer.close(result);
+ });
});
}
}
diff --git a/client/src/main/java/org/eclipse/hono/client/impl/HonoConnectionImpl.java b/client/src/main/java/org/eclipse/hono/client/impl/HonoConnectionImpl.java
index b7f58d6762..41e1ccefe5 100644
--- a/client/src/main/java/org/eclipse/hono/client/impl/HonoConnectionImpl.java
+++ b/client/src/main/java/org/eclipse/hono/client/impl/HonoConnectionImpl.java
@@ -16,7 +16,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@@ -24,7 +23,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Supplier;
import javax.security.sasl.AuthenticationException;
@@ -33,7 +31,6 @@
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.DisconnectListener;
import org.eclipse.hono.client.HonoConnection;
-import org.eclipse.hono.client.MessageConsumer;
import org.eclipse.hono.client.ReconnectListener;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.ServiceInvocationException;
@@ -63,9 +60,10 @@
import io.vertx.proton.sasl.SaslSystemException;
/**
- * A helper class for creating Vert.x based clients for Hono's arbitrary APIs.
+ * A helper class for managing a vertx-proton based AMQP connection to a
+ * Hono service endpoint.
*
- * The client ensures that all interactions with the peer are performed on the
+ * The connection ensures that all interactions with the peer are performed on the
* same vert.x {@code Context}. For this purpose the connect methods
* either use the current Context or create a new Context for connecting to
* the peer. This same Context is then used for all consecutive interactions with
@@ -99,7 +97,6 @@ public class HonoConnectionImpl implements HonoConnection {
*/
protected volatile Context context;
- private final List> creationRequests = new ArrayList<>();
private final List disconnectListeners = new ArrayList<>();
private final List reconnectListeners = new ArrayList<>();
private final AtomicBoolean connecting = new AtomicBoolean(false);
@@ -271,13 +268,8 @@ protected boolean isConnectedInternal() {
return connection != null && !connection.isDisconnected();
}
- /**
- * Checks if this client is shut down.
- *
- * @return {@code true} if this client is shut down already or is
- * in the process of shutting down.
- */
- protected final boolean isShutdown() {
+ @Override
+ public final boolean isShutdown() {
return shuttingDown.get();
}
@@ -341,6 +333,7 @@ public final Future connect(final ProtonClientOptions options) {
/**
* {@inheritDoc}
*/
+ @Deprecated(forRemoval = true, since = "1.0-M2")
@Override
public final Future connect(final Handler disconnectHandler) {
return connect(null, Objects.requireNonNull(disconnectHandler));
@@ -349,6 +342,7 @@ public final Future connect(final Handler disc
/**
* {@inheritDoc}
*/
+ @Deprecated(forRemoval = true, since = "1.0-M2")
@Override
public final Future connect(
final ProtonClientOptions options,
@@ -480,7 +474,6 @@ protected void clearState() {
setConnection(null);
- failAllCreationRequests();
notifyDisconnectHandlers();
// make sure we make configured number of attempts to re-connect
connectAttempts = new AtomicInteger(0);
@@ -493,14 +486,6 @@ private void notifyDisconnectHandlers() {
}
}
- private void failAllCreationRequests() {
-
- for (final Iterator> iter = creationRequests.iterator(); iter.hasNext();) {
- iter.next().handle(null);
- iter.remove();
- }
- }
-
private void reconnect(final Handler> connectionHandler,
final Handler disconnectHandler) {
reconnect(null, connectionHandler, disconnectHandler);
@@ -566,47 +551,6 @@ private void failConnectionAttempt(final Throwable connectionFailureCause, final
}
}
- /**
- * Creates a new message consumer for a tenant.
- *
- * @param tenantId The tenant to create the consumer for.
- * @param newConsumerSupplier The factory to use for creating a new
- * consumer.
- * @return A future indicating the outcome of the operation. The future
- * will be succeeded with the created consumer or will be
- * failed with a {@link ServiceInvocationException}
- * if the consumer cannot be created.
- */
- protected Future createConsumer(
- final String tenantId,
- final Supplier> newConsumerSupplier) {
-
- return executeOrRunOnContext(result -> createConsumer(tenantId, newConsumerSupplier, result));
- }
-
- private void createConsumer(
- final String tenantId,
- final Supplier> newConsumerSupplier,
- final Future result) {
-
- // register a handler to be notified if the underlying connection to the server fails
- // so that we can fail the returned future
- final Handler connectionFailureHandler = connectionLost -> {
- result.tryFail(
- new ServerErrorException(HttpURLConnection.HTTP_UNAVAILABLE, "connection to server lost"));
- };
- creationRequests.add(connectionFailureHandler);
-
- newConsumerSupplier.get().setHandler(attempt -> {
- creationRequests.remove(connectionFailureHandler);
- if (attempt.succeeded()) {
- result.tryComplete(attempt.result());
- } else {
- result.tryFail(attempt.cause());
- }
- });
- }
-
/**
* {@inheritDoc}
*
diff --git a/client/src/test/java/org/eclipse/hono/client/impl/CommandConsumerFactoryImplTest.java b/client/src/test/java/org/eclipse/hono/client/impl/CommandConsumerFactoryImplTest.java
index d0d163aaf9..2b2134e0ee 100644
--- a/client/src/test/java/org/eclipse/hono/client/impl/CommandConsumerFactoryImplTest.java
+++ b/client/src/test/java/org/eclipse/hono/client/impl/CommandConsumerFactoryImplTest.java
@@ -15,6 +15,7 @@
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
@@ -26,11 +27,12 @@
import java.net.HttpURLConnection;
-import org.apache.qpid.proton.amqp.transport.AmqpError;
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.Source;
import org.eclipse.hono.client.CommandContext;
+import org.eclipse.hono.client.DisconnectListener;
+import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.client.MessageConsumer;
+import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.config.ClientConfigProperties;
import org.junit.Before;
@@ -40,15 +42,14 @@
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
-import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
-import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
-import io.vertx.proton.ProtonConnection;
+import io.vertx.proton.ProtonMessageHandler;
+import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
@@ -63,15 +64,13 @@ public class CommandConsumerFactoryImplTest {
* Global timeout for each test case.
*/
@Rule
- public Timeout timeout = Timeout.seconds(3);
+ public Timeout timeout = Timeout.seconds(300);
private Vertx vertx;
private Context context;
private ClientConfigProperties props;
- private ProtonConnection con;
-
- private CommandConsumerFactoryImpl factory;
- private DisconnectHandlerProvidingConnectionFactory connectionFactory;
+ private HonoConnection connection;
+ private CommandConsumerFactoryImpl commandConnection;
private ProtonReceiver receiver;
/**
@@ -93,11 +92,15 @@ public void setUp() {
props = new ClientConfigProperties();
receiver = mock(ProtonReceiver.class);
- con = mock(ProtonConnection.class);
- when(con.createReceiver(anyString())).thenReturn(receiver);
-
- connectionFactory = new DisconnectHandlerProvidingConnectionFactory(con);
- factory = new CommandConsumerFactoryImpl(vertx, connectionFactory, props);
+ connection = HonoClientUnitTestHelper.mockHonoConnection(vertx, props);
+ when(connection.createReceiver(
+ anyString(),
+ any(ProtonQoS.class),
+ any(ProtonMessageHandler.class),
+ anyInt(),
+ any(Handler.class))).thenReturn(Future.succeededFuture(receiver));
+
+ commandConnection = new CommandConsumerFactoryImpl(connection);
}
/**
@@ -112,20 +115,17 @@ public void testCreateCommandConsumerFailsIfPeerRejectsLink(final TestContext ct
final Handler commandHandler = mock(Handler.class);
final Handler closeHandler = mock(Handler.class);
- final Source source = mock(Source.class);
- when(source.getAddress()).thenReturn(null);
- when(receiver.getRemoteSource()).thenReturn(source);
-
- factory.connect()
- .compose(c -> {
- final ArgumentCaptor>> linkOpenHandler = ArgumentCaptor.forClass(Handler.class);
- final Future consumer = factory.createCommandConsumer("theTenant", "theDevice", commandHandler, closeHandler);
- verify(con).createReceiver("control/theTenant/theDevice");
- verify(receiver).openHandler(linkOpenHandler.capture());
- verify(receiver).open();
- linkOpenHandler.getValue().handle(Future.succeededFuture());
- return consumer;
- }).setHandler(ctx.asyncAssertFailure(t -> {
+ final ServerErrorException ex = new ServerErrorException(HttpURLConnection.HTTP_UNAVAILABLE);
+ when(connection.createReceiver(
+ anyString(),
+ any(ProtonQoS.class),
+ any(ProtonMessageHandler.class),
+ anyInt(),
+ any(Handler.class)))
+ .thenReturn(Future.failedFuture(ex));
+
+ commandConnection.createCommandConsumer("theTenant", "theDevice", commandHandler, closeHandler)
+ .setHandler(ctx.asyncAssertFailure(t -> {
ctx.assertEquals(HttpURLConnection.HTTP_UNAVAILABLE, ((ServiceInvocationException) t).getErrorCode());
}));
}
@@ -140,24 +140,11 @@ public void testCreateCommandConsumerFailsIfPeerRejectsLink(final TestContext ct
@Test
public void testCreateCommandConsumerSucceeds(final TestContext ctx) {
- final String address = "control/theTenant/theDevice";
final Handler commandHandler = mock(Handler.class);
final Handler closeHandler = mock(Handler.class);
- final Source source = mock(Source.class);
- when(source.getAddress()).thenReturn(address);
- when(receiver.getSource()).thenReturn(source);
- when(receiver.getRemoteSource()).thenReturn(source);
- factory.connect()
- .compose(c -> {
- final ArgumentCaptor>> linkOpenHandler = ArgumentCaptor.forClass(Handler.class);
- final Future consumer = factory.createCommandConsumer("theTenant", "theDevice", commandHandler, closeHandler);
- verify(con).createReceiver(address);
- verify(receiver).openHandler(linkOpenHandler.capture());
- verify(receiver).open();
- linkOpenHandler.getValue().handle(Future.succeededFuture(receiver));
- return consumer;
- }).setHandler(ctx.asyncAssertSuccess());
+ commandConnection.createCommandConsumer("theTenant", "theDevice", commandHandler, closeHandler)
+ .setHandler(ctx.asyncAssertSuccess());
}
/**
@@ -173,35 +160,21 @@ public void testCreateCommandConsumerSetsRemoteCloseHandler(final TestContext ct
final String address = "control/theTenant/theDevice";
final Handler commandHandler = mock(Handler.class);
final Handler closeHandler = mock(Handler.class);
- final Source source = mock(Source.class);
- when(source.getAddress()).thenReturn(address);
- when(receiver.getSource()).thenReturn(source);
- when(receiver.getRemoteSource()).thenReturn(source);
- // GIVEN a command consumer for which a close handler
- // has been registered
- factory.connect()
- .compose(c -> {
- final Future consumer = factory.createCommandConsumer("theTenant", "theDevice", commandHandler, closeHandler);
- verify(con).createReceiver(address);
- final ArgumentCaptor>> linkOpenHandler = ArgumentCaptor.forClass(Handler.class);
- verify(receiver).openHandler(linkOpenHandler.capture());
- verify(receiver).open();
- linkOpenHandler.getValue().handle(Future.succeededFuture(receiver));
- return consumer;
- }).map(c -> {
- final ArgumentCaptor>> remoteCloseHandler = ArgumentCaptor.forClass(Handler.class);
- verify(receiver).closeHandler(remoteCloseHandler.capture());
- // WHEN the peer closes the link
- remoteCloseHandler.getValue().handle(Future.succeededFuture(receiver));
- // THEN the close handler is invoked
- verify(closeHandler).handle(null);
- return c;
- }).setHandler(ctx.asyncAssertSuccess());
+ commandConnection.createCommandConsumer("theTenant", "theDevice", commandHandler, closeHandler);
+ final ArgumentCaptor> captor = ArgumentCaptor.forClass(Handler.class);
+ verify(connection).createReceiver(
+ eq(address),
+ eq(ProtonQoS.AT_LEAST_ONCE),
+ any(ProtonMessageHandler.class),
+ eq(0),
+ captor.capture());
+ captor.getValue().handle(address);
+ verify(closeHandler).handle(null);
}
/**
- * Verifies that a command consumer's close method is invoked,
+ * Verifies that when a command consumer's close method is invoked,
* then
*
* - the underlying link is closed,
@@ -221,43 +194,35 @@ public void testLocalCloseRemovesCommandConsumerFromCache(final TestContext ctx)
when(source.getAddress()).thenReturn(address);
when(receiver.getSource()).thenReturn(source);
when(receiver.getRemoteSource()).thenReturn(source);
+ when(receiver.isOpen()).thenReturn(Boolean.TRUE);
when(vertx.setPeriodic(anyLong(), any(Handler.class))).thenReturn(10L);
// GIVEN a command consumer
- factory.connect()
- .compose(client -> {
- final Future consumer = factory.createCommandConsumer("theTenant", "theDevice", commandHandler, null, 5000L);
- verify(con).createReceiver(address);
- final ArgumentCaptor>> linkOpenHandler = ArgumentCaptor.forClass(Handler.class);
- verify(receiver).closeHandler(any(Handler.class));
- verify(receiver).openHandler(linkOpenHandler.capture());
- verify(receiver).open();
- linkOpenHandler.getValue().handle(Future.succeededFuture(receiver));
- when(receiver.isOpen()).thenReturn(Boolean.TRUE);
- verify(vertx).setPeriodic(eq(5000L), any(Handler.class));
- return consumer;
- }).map(consumer -> {
- // WHEN closing the link locally
- final Future localCloseHandler = Future.future();
- consumer.close(localCloseHandler.completer());
- final ArgumentCaptor>> closeHandler = ArgumentCaptor.forClass(Handler.class);
- verify(receiver, times(2)).closeHandler(closeHandler.capture());
- verify(receiver).close();
- // and the peer sends its detach frame
- closeHandler.getValue().handle(Future.succeededFuture(receiver));
- return null;
- }).map(ok -> {
- // THEN the liveness check is canceled
- verify(vertx).cancelTimer(10L);
- // and the next attempt to create a command consumer for the same address
- final Future newConsumer = factory.createCommandConsumer("theTenant", "theDevice", commandHandler, null);
- // results in a new link to be opened
- verify(con, times(2)).createReceiver(address);
- final ArgumentCaptor>> linkOpenHandler = ArgumentCaptor.forClass(Handler.class);
- verify(receiver, times(2)).openHandler(linkOpenHandler.capture());
- linkOpenHandler.getValue().handle(Future.succeededFuture(receiver));
- return newConsumer;
- }).setHandler(ctx.asyncAssertSuccess());
+ commandConnection.createCommandConsumer("theTenant", "theDevice", commandHandler, null, 5000L)
+ .map(consumer -> {
+ verify(vertx).setPeriodic(eq(5000L), any(Handler.class));
+ // WHEN closing the link locally
+ final Future localCloseHandler = Future.future();
+ consumer.close(localCloseHandler);
+ final ArgumentCaptor> closeHandler = ArgumentCaptor.forClass(Handler.class);
+ verify(connection).closeAndFree(eq(receiver), closeHandler.capture());
+ // and the peer sends its detach frame
+ closeHandler.getValue().handle(null);
+ return null;
+ }).map(ok -> {
+ // THEN the liveness check is canceled
+ verify(vertx).cancelTimer(10L);
+ // and the next attempt to create a command consumer for the same address
+ final Future newConsumer = commandConnection.createCommandConsumer("theTenant", "theDevice", commandHandler, null);
+ // results in a new link to be opened
+ verify(connection, times(2)).createReceiver(
+ eq(address),
+ eq(ProtonQoS.AT_LEAST_ONCE),
+ any(ProtonMessageHandler.class),
+ eq(0),
+ any(Handler.class));
+ return newConsumer;
+ }).setHandler(ctx.asyncAssertSuccess());
}
/**
@@ -278,49 +243,42 @@ public void testConsumerIsRecreatedOnConnectionFailure(final TestContext ctx) {
when(receiver.getSource()).thenReturn(source);
when(receiver.getRemoteSource()).thenReturn(source);
when(vertx.setPeriodic(anyLong(), any(Handler.class))).thenReturn(10L);
+ doAnswer(invocation -> {
+ final Handler handler = invocation.getArgument(1);
+ handler.handle(null);
+ return null;
+ }).when(connection).closeAndFree(any(), any(Handler.class));
// GIVEN a command connection with an established command consumer
// which is checked periodically for liveness
- factory.connect().setHandler(ctx.asyncAssertSuccess());
- assertTrue(connectionFactory.await());
- connectionFactory.setExpectedSucceedingConnectionAttempts(1);
// intentionally using a check interval that is smaller than the minimum
final long livenessCheckInterval = CommandConsumerFactoryImpl.MIN_LIVENESS_CHECK_INTERVAL_MILLIS - 1;
- final Async consumerCreation = ctx.async();
- final Future commandConsumer = factory.createCommandConsumer(
- "theTenant", "theDevice", commandHandler, closeHandler, eq(livenessCheckInterval));
- commandConsumer.setHandler(ctx.asyncAssertSuccess(ok -> consumerCreation.complete()));
- verify(con).createReceiver(address);
- final ArgumentCaptor>> linkOpenHandler = ArgumentCaptor.forClass(Handler.class);
- verify(receiver).openHandler(linkOpenHandler.capture());
- verify(receiver).open();
- linkOpenHandler.getValue().handle(Future.succeededFuture(receiver));
+ final Future commandConsumer = commandConnection.createCommandConsumer(
+ "theTenant", "theDevice", commandHandler, closeHandler, livenessCheckInterval);
+ assertTrue(commandConsumer.isComplete());
final ArgumentCaptor> livenessCheck = ArgumentCaptor.forClass(Handler.class);
// the liveness check is registered with the minimum interval length
verify(vertx).setPeriodic(eq(CommandConsumerFactoryImpl.MIN_LIVENESS_CHECK_INTERVAL_MILLIS), livenessCheck.capture());
- consumerCreation.await();
// WHEN the command connection fails
- connectionFactory.getDisconnectHandler().handle(con);
+ final ArgumentCaptor disconnectListener = ArgumentCaptor.forClass(DisconnectListener.class);
+ verify(connection).addDisconnectListener(disconnectListener.capture());
+ disconnectListener.getValue().onDisconnect(connection);
// THEN the connection is re-established
- assertTrue(connectionFactory.await());
+ when(connection.isConnected()).thenReturn(Future.succeededFuture());
// and the liveness check re-creates the command consumer
- final Async consumerRecreation = ctx.async();
- when(receiver.open()).thenAnswer(invocation -> {
- consumerRecreation.complete();
- return receiver;
- });
livenessCheck.getValue().handle(10L);
- verify(con, times(2)).createReceiver(address);
- verify(receiver, times(2)).openHandler(any(Handler.class));
- verify(receiver, times(2)).open();
- consumerRecreation.await();
-
- // and when the consumer is finally closed
- final Future localCloseHandler = mock(Future.class);
- commandConsumer.result().close(localCloseHandler);
+ verify(connection, times(2)).createReceiver(
+ eq(address),
+ eq(ProtonQoS.AT_LEAST_ONCE),
+ any(ProtonMessageHandler.class),
+ eq(0),
+ any(Handler.class));
+
+ // and when the consumer is finally closed locally
+ commandConsumer.result().close(null);
// then the liveness check has been canceled
verify(vertx).cancelTimer(10L);
}
@@ -340,13 +298,15 @@ public void testLivenessCheckLocksRecreationAttempt(final TestContext ctx) {
final String address = "control/theTenant/theDevice";
final Handler commandHandler = mock(Handler.class);
final Handler remoteCloseHandler = mock(Handler.class);
-
- // GIVEN an established command connection
- factory.connect().setHandler(ctx.asyncAssertSuccess());
- assertTrue(connectionFactory.await());
- connectionFactory.setExpectedSucceedingConnectionAttempts(1);
-
- final Handler livenessCheck = factory.newLivenessCheck("theTenant", "theDevice", "key", commandHandler, remoteCloseHandler);
+ final Handler livenessCheck = commandConnection.newLivenessCheck("theTenant", "theDevice", "key", commandHandler, remoteCloseHandler);
+ final Future createdReceiver = Future.future();
+ when(connection.isConnected()).thenReturn(Future.succeededFuture());
+ when(connection.createReceiver(
+ eq(address),
+ eq(ProtonQoS.AT_LEAST_ONCE),
+ any(ProtonMessageHandler.class),
+ anyInt(),
+ any(Handler.class))).thenReturn(createdReceiver);
// WHEN the liveness check fires
livenessCheck.handle(10L);
@@ -354,17 +314,24 @@ public void testLivenessCheckLocksRecreationAttempt(final TestContext ctx) {
livenessCheck.handle(10L);
// THEN only one attempt has been made to recreate the consumer link
- verify(con, times(1)).createReceiver(address);
+ verify(connection, times(1)).createReceiver(
+ eq(address),
+ eq(ProtonQoS.AT_LEAST_ONCE),
+ any(ProtonMessageHandler.class),
+ eq(0),
+ any(Handler.class));
// and when the first attempt has finally timed out
- when(receiver.getRemoteCondition()).thenReturn(new ErrorCondition(AmqpError.INTERNAL_ERROR, "internal error"));
- final ArgumentCaptor>> linkOpenHandler = ArgumentCaptor.forClass(Handler.class);
- verify(receiver).openHandler(linkOpenHandler.capture());
- linkOpenHandler.getValue().handle(Future.failedFuture("internal error"));
+ createdReceiver.fail(new ServerErrorException(HttpURLConnection.HTTP_UNAVAILABLE));
// then the next run of the liveness check
livenessCheck.handle(10L);
// will start a new attempt to re-create the consumer link
- verify(con, times(2)).createReceiver(address);
+ verify(connection, times(2)).createReceiver(
+ eq(address),
+ eq(ProtonQoS.AT_LEAST_ONCE),
+ any(ProtonMessageHandler.class),
+ eq(0),
+ any(Handler.class));
}
}
diff --git a/client/src/test/java/org/eclipse/hono/client/impl/HonoConnectionImplTest.java b/client/src/test/java/org/eclipse/hono/client/impl/HonoConnectionImplTest.java
index 23e57dd7c2..55af5c52fa 100644
--- a/client/src/test/java/org/eclipse/hono/client/impl/HonoConnectionImplTest.java
+++ b/client/src/test/java/org/eclipse/hono/client/impl/HonoConnectionImplTest.java
@@ -244,40 +244,6 @@ public void testConnectFailsWithClientErrorForPermanentSaslSystemException(final
assertTrue(connectionFactory.awaitFailure());
}
- /**
- * Verifies that a request to create a consumer is failed immediately when the
- * underlying connection to the server fails.
- *
- * @param ctx The vert.x test context.
- */
- @Test
- public void testCreateConsumerFailsOnConnectionFailure(final TestContext ctx) {
-
- // GIVEN a connected client that already tries to create a telemetry sender for "tenant"
- final Async connected = ctx.async();
- honoConnection.connect(new ProtonClientOptions()).setHandler(ctx.asyncAssertSuccess(ok -> connected.complete()));
- connected.await();
-
- final Async creationFailure = ctx.async();
- final Async supplierInvocation = ctx.async();
- honoConnection.createConsumer("tenant", () -> {
- supplierInvocation.complete();
- return Future.future();
- }).setHandler(ctx.asyncAssertFailure(cause -> {
- creationFailure.complete();
- }));
- // wait until the consumer supplier has been invoked
- // so that we can be sure that the disconnect handler for
- // for the creation request has been registered
- supplierInvocation.await();
-
- // WHEN the underlying connection fails
- connectionFactory.getDisconnectHandler().handle(con);
-
- // THEN all creation requests are failed
- creationFailure.await();
- }
-
/**
* Verifies that the client tries to re-establish a lost connection to a server.
*
diff --git a/service-base/src/main/java/org/eclipse/hono/service/AbstractAdapterConfig.java b/service-base/src/main/java/org/eclipse/hono/service/AbstractAdapterConfig.java
index ab6c68cf8d..4eb94e71e9 100644
--- a/service-base/src/main/java/org/eclipse/hono/service/AbstractAdapterConfig.java
+++ b/service-base/src/main/java/org/eclipse/hono/service/AbstractAdapterConfig.java
@@ -23,7 +23,6 @@
import org.eclipse.hono.client.RegistrationClientFactory;
import org.eclipse.hono.client.RequestResponseClientConfigProperties;
import org.eclipse.hono.client.TenantClientFactory;
-import org.eclipse.hono.client.impl.CommandConsumerFactoryImpl;
import org.eclipse.hono.config.ApplicationConfigProperties;
import org.eclipse.hono.config.ClientConfigProperties;
import org.eclipse.hono.config.VertxProperties;
@@ -299,7 +298,7 @@ public ClientConfigProperties commandConsumerFactoryConfig() {
@Bean
@Scope("prototype")
public CommandConsumerFactory commandConsumerFactory() {
- return new CommandConsumerFactoryImpl(vertx(), commandConsumerFactoryConfig());
+ return CommandConsumerFactory.create(HonoConnection.newConnection(vertx(), commandConsumerFactoryConfig()));
}
/**
diff --git a/site/content/release-notes.md b/site/content/release-notes.md
index fa3cc730d0..a03ef8a221 100644
--- a/site/content/release-notes.md
+++ b/site/content/release-notes.md
@@ -25,12 +25,33 @@ title = "Release Notes"
### API Changes
-* Several changes have been made to the `org.eclipse.hono.client.MessageSender` interface:
+* The `hono-client` module has undergone several major and incompatible changes. The most
+ important change affects the `HonoClient` interface which no longer serves as a factory
+ for the arbitrary clients to the Hono service endpoints.
+ It has been renamed to `HonoConnection` and now only represents the underlying
+ AMQP connection to a peer and provides methods for managing the connection state
+ and registering listeners for arbitrary life-cycle events of the connection.
+ In addition to this, several factory interfaces have been added which can be used
+ to create specific clients to Hono's arbitrary services. All of the former `HonoClient`
+ interface's factory methods have been distributed accordingly to:
+ * `org.eclipse.hono.client.ApplicationClientFactory` for creating clients to
+ Hono's north bound Telemetry, Event and Command & Control API.
+ * `org.eclipse.hono.client.DownstreamSenderFactory` for creating clients to
+ Hono's south bound Telemetry and Event APIs.
+ * `org.eclipse.hono.client.CommandConsumerFactory` for creating clients to
+ Hono's south bound Command & Control API.
+ * `org.eclipse.hono.client.TenantClientFactory` for creating clients to
+ Hono's Tenant API.
+ * `org.eclipse.hono.client.RegistrationClientFactory` for creating clients to
+ Hono's Device Registration API.
+ * `org.eclipse.hono.client.CredentialsClientFactory` for creating clients to
+ Hono's Credentials API.
+* In this context the `org.eclipse.hono.client.MessageSender` interface has been changed as follows:
* The *send* methods have been changed to no longer accept a *registration assertion token*
which became obsolete with the removal of the *Hono Messaging* component.
* The *isRegistrationAssertionRequired* method has been removed from the interface.
* All *send* method variants which accept specific message parameters have been moved into
- the new `org.eclipse.hono.client.DownstreamSender` interface which extends
+ the new `org.eclipse.hono.client.DownstreamSender` interface which extends the existing
`MessageSender`.
* Several changes have been made to the `org.eclipse.hono.service.AbstractProtocolAdapterBase`
class:
@@ -82,6 +103,12 @@ title = "Release Notes"
have been deprecated. They will be removed from Hono 1.0 altogether.
A new HTTP based API will be defined instead which can then be used to *manage* the content
of a device registry.
+* `org.eclipse.hono.client.HonoConnection`'s *connect* method variants accepting
+ a disconnect handler have been deprecated and will be removed in Hono 1.0.
+ Client code should use one of the other *connect* methods instead and register a
+ `org.eclipse.hono.client.DisconnectListener` and/or a
+ `org.eclipse.hono.client.ReconnectListener` to get notified about life-cycle
+ events of the underlying AMQP connection.
## 1.0-M1