diff --git a/cli/src/main/java/org/eclipse/hono/cli/AbstractClient.java b/cli/src/main/java/org/eclipse/hono/cli/AbstractClient.java index 449e3846e5..6a531bcec9 100644 --- a/cli/src/main/java/org/eclipse/hono/cli/AbstractClient.java +++ b/cli/src/main/java/org/eclipse/hono/cli/AbstractClient.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2016, 2018 Contributors to the Eclipse Foundation + * Copyright (c) 2016, 2019 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -15,7 +15,7 @@ import java.util.Objects; -import org.eclipse.hono.client.HonoClient; +import org.eclipse.hono.client.HonoApplicationClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -31,10 +31,10 @@ abstract class AbstractClient extends AbstractCliClient { protected String deviceId; @Value(value = "${connection.retryInterval}") protected int connectionRetryInterval; - protected HonoClient client; + protected HonoApplicationClient client; @Autowired - public final void setHonoClient(final HonoClient client) { + public final void setHonoClient(final HonoApplicationClient client) { this.client = Objects.requireNonNull(client); } } diff --git a/cli/src/main/java/org/eclipse/hono/cli/AppConfiguration.java b/cli/src/main/java/org/eclipse/hono/cli/AppConfiguration.java index 56d972f6c7..bb05c9cf2a 100644 --- a/cli/src/main/java/org/eclipse/hono/cli/AppConfiguration.java +++ b/cli/src/main/java/org/eclipse/hono/cli/AppConfiguration.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2016, 2018 Contributors to the Eclipse Foundation + * Copyright (c) 2016, 2019 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -13,8 +13,8 @@ package org.eclipse.hono.cli; -import org.eclipse.hono.client.HonoClient; -import org.eclipse.hono.client.impl.HonoClientImpl; +import org.eclipse.hono.client.HonoApplicationClient; +import org.eclipse.hono.client.impl.HonoApplicationClientImpl; import org.eclipse.hono.config.ClientConfigProperties; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; @@ -68,12 +68,12 @@ public ClientConfigProperties honoClientConfig() { } /** - * Exposes a {@code HonoClient} as a Spring bean. + * Exposes a {@code HonoApplicationClient} as a Spring bean. * * @return The Hono client. */ @Bean - public HonoClient honoClient() { - return new HonoClientImpl(vertx(), honoClientConfig()); + public HonoApplicationClient honoClient() { + return new HonoApplicationClientImpl(vertx(), honoClientConfig()); } } diff --git a/cli/src/main/java/org/eclipse/hono/cli/Receiver.java b/cli/src/main/java/org/eclipse/hono/cli/Receiver.java index a8d95a279e..8af475302b 100644 --- a/cli/src/main/java/org/eclipse/hono/cli/Receiver.java +++ b/cli/src/main/java/org/eclipse/hono/cli/Receiver.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2016, 2018 Contributors to the Eclipse Foundation + * Copyright (c) 2016, 2019 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -20,7 +20,6 @@ import io.vertx.core.buffer.Buffer; import org.apache.qpid.proton.message.Message; -import org.eclipse.hono.client.HonoClient; import org.eclipse.hono.util.MessageHelper; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Profile; @@ -62,16 +61,16 @@ public class Receiver extends AbstractClient { @PostConstruct Future start() { return client.connect(this::onDisconnect) - .compose(this::createConsumer) + .compose(res -> createConsumer()) .setHandler(this::handleCreateConsumerStatus); } - private CompositeFuture createConsumer(final HonoClient connectedClient) { + private CompositeFuture createConsumer() { final Handler closeHandler = closeHook -> { LOG.info("close handler of consumer is called"); vertx.setTimer(connectionRetryInterval, reconnect -> { LOG.info("attempting to re-open the consumer link ..."); - createConsumer(connectedClient); + createConsumer(); }); }; @@ -79,11 +78,11 @@ private CompositeFuture createConsumer(final HonoClient connectedClient) { final List consumerFutures = new ArrayList<>(); if (messageType.equals(TYPE_EVENT) || messageType.equals(TYPE_ALL)) { consumerFutures.add( - connectedClient.createEventConsumer(tenantId, msg -> handleMessage(TYPE_EVENT, msg), closeHandler)); + client.createEventConsumer(tenantId, msg -> handleMessage(TYPE_EVENT, msg), closeHandler)); } if (messageType.equals(TYPE_TELEMETRY) || messageType.equals(TYPE_ALL)) { - consumerFutures.add(connectedClient + consumerFutures.add(client .createTelemetryConsumer(tenantId, msg -> handleMessage(TYPE_TELEMETRY, msg), closeHandler)); } @@ -101,7 +100,7 @@ private void onDisconnect(final ProtonConnection con) { vertx.setTimer(connectionRetryInterval, reconnect -> { LOG.info("attempting to re-connect to Hono ..."); client.connect(this::onDisconnect) - .compose(this::createConsumer); + .compose(res -> createConsumer()); }); } diff --git a/cli/src/test/java/org/eclipse/hono/cli/ReceiverTest.java b/cli/src/test/java/org/eclipse/hono/cli/ReceiverTest.java index d395781d37..25337fdaff 100644 --- a/cli/src/test/java/org/eclipse/hono/cli/ReceiverTest.java +++ b/cli/src/test/java/org/eclipse/hono/cli/ReceiverTest.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2016, 2018 Contributors to the Eclipse Foundation + * Copyright (c) 2016, 2019 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; -import org.eclipse.hono.client.HonoClient; +import org.eclipse.hono.client.HonoApplicationClient; import org.eclipse.hono.client.MessageConsumer; import org.junit.After; import org.junit.Before; @@ -63,7 +63,7 @@ public void setup() { final Vertx vertx = mock(Vertx.class); when(vertx.getOrCreateContext()).thenReturn(mock(Context.class)); - final HonoClient client = mock(HonoClient.class); + final HonoApplicationClient client = mock(HonoApplicationClient.class); when(client.connect(any(Handler.class))).thenReturn(Future.succeededFuture(client)); when(client.connect()).thenReturn(Future.succeededFuture(client)); when(client.createTelemetryConsumer(anyString(), any(Consumer.class), any(Handler.class))) diff --git a/client/src/main/java/org/eclipse/hono/client/HonoApplicationClient.java b/client/src/main/java/org/eclipse/hono/client/HonoApplicationClient.java new file mode 100644 index 0000000000..893e2fdbe8 --- /dev/null +++ b/client/src/main/java/org/eclipse/hono/client/HonoApplicationClient.java @@ -0,0 +1,161 @@ +/******************************************************************************* + * Copyright (c) 2016, 2019 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + *******************************************************************************/ + +package org.eclipse.hono.client; + +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +import org.apache.qpid.proton.message.Message; +import org.eclipse.hono.config.ClientConfigProperties; + +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.proton.ProtonDelivery; + +/** + * A factory for creating clients for Hono's APIs. + *

+ * A factory maintains a single AMQP 1.0 connection and a single session to the peer. This session is shared by all AMQP + * 1.0 links established for senders, consumers and clients created using the corresponding + * factory methods. + *

+ * The getOrCreate factory methods return an existing client for the given address if available. Note that + * factory methods for creating consumers always return a new instance so that all messages received are only + * processed by the handler passed in to the factory method. + *

+ * Before any of the factory methods can be invoked successfully, the client needs to connect to Hono. This is done by + * invoking one of the client's connect methods. + *

+ * An AMQP connection is established in multiple stages: + *

    + *
  1. The client establishes a TCP connection to the peer. For this to succeed, the peer must have registered a + * socket listener on the IP address and port that the client is configured to use.
  2. + *
  3. The client performs a SASL handshake with the peer if required by the peer. The client needs to be + * configured with correct credentials in order for this stage to succeed.
  4. + *
  5. Finally, the client and the peer need to agree on AMQP 1.0 specific connection parameters like capabilities + * and session window size.
  6. + *
+ * Some of the connect methods accept a {@code ProtonClientOptions} type parameter. Note that these options + * only influence the client's behavior when establishing the TCP connection with the peer. The overall behavior of + * the client regarding the establishment of the AMQP connection must be configured using the + * {@link ClientConfigProperties} passed in to the newClient method. In particular, the + * {@link ClientConfigProperties#setReconnectAttempts(int)} method can be used to specify, how many + * times the client should try to establish a connection to the peer before giving up. + *

+ * NB When the client tries to establish a connection to the peer, it stores the current + * vert.x {@code Context} in a local variable and performs all further interactions with the peer running + * on this Context. Invoking any of the methods of the client from a vert.x Context other than the one + * used for establishing the connection may cause race conditions or even deadlocks because the handlers + * registered on the {@code Future}s returned by these methods will be invoked from the stored Context. + * It is the invoking code's responsibility to either ensure that the client's methods are always invoked + * from the same Context or to make sure that the handlers are running on the correct Context, e.g. by using + * the {@code Context}'s runOnContext method. + */ +public interface HonoApplicationClient extends HonoClient { + + /** + * Creates a client for consuming data from Hono's north bound Telemetry API. + * + * @param tenantId The tenant to consume data for. + * @param telemetryConsumer The handler to invoke with every message received. + * @param closeHandler The handler invoked when the peer detaches the link. + * @return A future that will complete with the consumer once the link has been established. The future will fail if + * the link cannot be established, e.g. because this client is not connected. + * @throws NullPointerException if any of the parameters is {@code null}. + */ + Future createTelemetryConsumer(String tenantId, Consumer telemetryConsumer, + Handler closeHandler); + + /** + * Creates a client for consuming events from Hono's north bound Event API. + *

+ * The events passed in to the event consumer will be settled automatically if the consumer does not throw an + * exception. + * + * @param tenantId The tenant to consume events for. + * @param eventConsumer The handler to invoke with every event received. + * @param closeHandler The handler invoked when the peer detaches the link. + * @return A future that will complete with the consumer once the link has been established. The future will fail if + * the link cannot be established, e.g. because this client is not connected. + * @throws NullPointerException if any of the parameters is {@code null}. + */ + Future createEventConsumer(String tenantId, Consumer eventConsumer, + Handler closeHandler); + + /** + * Creates a client for consuming events from Hono's north bound Event API. + *

+ * The events passed in to the event consumer will be settled automatically if the consumer does not throw an + * exception and does not manually handle the message disposition using the passed in delivery. + * + * @param tenantId The tenant to consume events for. + * @param eventConsumer The handler to invoke with every event received. + * @param closeHandler The handler invoked when the peer detaches the link. + * @return A future that will complete with the consumer once the link has been established. The future will fail if + * the link cannot be established, e.g. because this client is not connected. + * @throws NullPointerException if any of the parameters is {@code null}. + */ + Future createEventConsumer(String tenantId, BiConsumer eventConsumer, + Handler closeHandler); + + /** + * Gets a client for sending commands to a device. + *

+ * The client returned may be either newly created or it may be an existing + * client for the given device. + *

+ * This method will use an implementation specific mechanism (e.g. a UUID) to create + * a unique reply-to address to be included in commands sent to the device. The protocol + * adapters need to convey an encoding of the reply-to address to the device when delivering + * the command. Consequently, the number of bytes transferred to the device depends on the + * length of the reply-to address being used. In situations where this is a major concern it + * might be advisable to use {@link #getOrCreateCommandClient(String, String, String)} for + * creating a command client and provide custom (and shorter) reply-to identifier + * to be used in the reply-to address. + * + * @param tenantId The tenant that the device belongs to. + * @param deviceId The device to send the commands to. + * @return A future that will complete with the command and control client (if successful) or + * fail if the client cannot be created, e.g. because the underlying connection + * is not established or if a concurrent request to create a client for the same + * tenant and device is already being executed. + * @throws NullPointerException if the tenantId is {@code null}. + */ + Future getOrCreateCommandClient(String tenantId, String deviceId); + + /** + * Gets a client for sending commands to a device. + *

+ * The client returned may be either newly created or it may be an existing + * client for the given device. + * + * @param tenantId The tenant that the device belongs to. + * @param deviceId The device to send the commands to. + * @param replyId An arbitrary string which (in conjunction with the tenant and device ID) uniquely + * identifies this command client. + * This identifier will only be used for creating a new client for the device. + * If this method returns an existing client for the device then the client will use + * the reply-to address determined during its initial creation. In particular, this + * means that if the (existing) client has originally been created using the + * {@link #getOrCreateCommandClient(String, String)} method, then the reply-to address + * used by the client will most likely not contain the given identifier. + * @return A future that will complete with the command and control client (if successful) or + * fail if the client cannot be created, e.g. because the underlying connection + * is not established or if a concurrent request to create a client for the same + * tenant and device is already being executed. + * @throws NullPointerException if the tenantId is {@code null}. + */ + Future getOrCreateCommandClient(String tenantId, String deviceId, String replyId); + +} diff --git a/client/src/main/java/org/eclipse/hono/client/HonoClient.java b/client/src/main/java/org/eclipse/hono/client/HonoClient.java index 89b7de660b..daccad3be7 100644 --- a/client/src/main/java/org/eclipse/hono/client/HonoClient.java +++ b/client/src/main/java/org/eclipse/hono/client/HonoClient.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2016, 2018 Contributors to the Eclipse Foundation + * Copyright (c) 2016, 2019 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -13,23 +13,19 @@ package org.eclipse.hono.client; -import java.util.function.BiConsumer; -import java.util.function.Consumer; - import org.apache.qpid.proton.amqp.Symbol; -import org.apache.qpid.proton.message.Message; import org.eclipse.hono.client.impl.HonoClientImpl; import org.eclipse.hono.config.ClientConfigProperties; + import io.vertx.core.AsyncResult; import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Vertx; import io.vertx.proton.ProtonClientOptions; import io.vertx.proton.ProtonConnection; -import io.vertx.proton.ProtonDelivery; /** - * A factory for creating clients for Hono's arbitrary APIs. + * A factory for creating clients for Hono's APIs. *

* A factory maintains a single AMQP 1.0 connection and a single session to the peer. This session is shared by all AMQP * 1.0 links established for senders, consumers and clients created using the corresponding @@ -293,51 +289,6 @@ Future connect( */ Future getOrCreateEventSender(String tenantId, String deviceId); - /** - * Creates a client for consuming data from Hono's north bound Telemetry API. - * - * @param tenantId The tenant to consume data for. - * @param telemetryConsumer The handler to invoke with every message received. - * @param closeHandler The handler invoked when the peer detaches the link. - * @return A future that will complete with the consumer once the link has been established. The future will fail if - * the link cannot be established, e.g. because this client is not connected. - * @throws NullPointerException if any of the parameters is {@code null}. - */ - Future createTelemetryConsumer(String tenantId, Consumer telemetryConsumer, - Handler closeHandler); - - /** - * Creates a client for consuming events from Hono's north bound Event API. - *

- * The events passed in to the event consumer will be settled automatically if the consumer does not throw an - * exception. - * - * @param tenantId The tenant to consume events for. - * @param eventConsumer The handler to invoke with every event received. - * @param closeHandler The handler invoked when the peer detaches the link. - * @return A future that will complete with the consumer once the link has been established. The future will fail if - * the link cannot be established, e.g. because this client is not connected. - * @throws NullPointerException if any of the parameters is {@code null}. - */ - Future createEventConsumer(String tenantId, Consumer eventConsumer, - Handler closeHandler); - - /** - * Creates a client for consuming events from Hono's north bound Event API. - *

- * The events passed in to the event consumer will be settled automatically if the consumer does not throw an - * exception and does not manually handle the message disposition using the passed in delivery. - * - * @param tenantId The tenant to consume events for. - * @param eventConsumer The handler to invoke with every event received. - * @param closeHandler The handler invoked when the peer detaches the link. - * @return A future that will complete with the consumer once the link has been established. The future will fail if - * the link cannot be established, e.g. because this client is not connected. - * @throws NullPointerException if any of the parameters is {@code null}. - */ - Future createEventConsumer(String tenantId, BiConsumer eventConsumer, - Handler closeHandler); - /** * Gets a client for invoking operations on a service implementing Hono's Device Registration API. * @@ -375,55 +326,6 @@ Future createEventConsumer(String tenantId, BiConsumer getOrCreateTenantClient(); - /** - * Gets a client for sending commands to a device. - *

- * The client returned may be either newly created or it may be an existing - * client for the given device. - *

- * This method will use an implementation specific mechanism (e.g. a UUID) to create - * a unique reply-to address to be included in commands sent to the device. The protocol - * adapters need to convey an encoding of the reply-to address to the device when delivering - * the command. Consequently, the number of bytes transferred to the device depends on the - * length of the reply-to address being used. In situations where this is a major concern it - * might be advisable to use {@link #getOrCreateCommandClient(String, String, String)} for - * creating a command client and provide custom (and shorter) reply-to identifier - * to be used in the reply-to address. - * - * @param tenantId The tenant that the device belongs to. - * @param deviceId The device to send the commands to. - * @return A future that will complete with the command and control client (if successful) or - * fail if the client cannot be created, e.g. because the underlying connection - * is not established or if a concurrent request to create a client for the same - * tenant and device is already being executed. - * @throws NullPointerException if the tenantId is {@code null}. - */ - Future getOrCreateCommandClient(String tenantId, String deviceId); - - /** - * Gets a client for sending commands to a device. - *

- * The client returned may be either newly created or it may be an existing - * client for the given device. - * - * @param tenantId The tenant that the device belongs to. - * @param deviceId The device to send the commands to. - * @param replyId An arbitrary string which (in conjunction with the tenant and device ID) uniquely - * identifies this command client. - * This identifier will only be used for creating a new client for the device. - * If this method returns an existing client for the device then the client will use - * the reply-to address determined during its initial creation. In particular, this - * means that if the (existing) client has originally been created using the - * {@link #getOrCreateCommandClient(String, String)} method, then the reply-to address - * used by the client will most likely not contain the given identifier. - * @return A future that will complete with the command and control client (if successful) or - * fail if the client cannot be created, e.g. because the underlying connection - * is not established or if a concurrent request to create a client for the same - * tenant and device is already being executed. - * @throws NullPointerException if the tenantId is {@code null}. - */ - Future getOrCreateCommandClient(String tenantId, String deviceId, String replyId); - /** * Closes this client's connection to the Hono server. *

diff --git a/client/src/main/java/org/eclipse/hono/client/impl/HonoApplicationClientImpl.java b/client/src/main/java/org/eclipse/hono/client/impl/HonoApplicationClientImpl.java new file mode 100644 index 0000000000..5b00db1f0f --- /dev/null +++ b/client/src/main/java/org/eclipse/hono/client/impl/HonoApplicationClientImpl.java @@ -0,0 +1,189 @@ +/******************************************************************************* + * Copyright (c) 2016, 2019 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + *******************************************************************************/ +package org.eclipse.hono.client.impl; + +import java.util.Objects; +import java.util.UUID; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +import org.apache.qpid.proton.message.Message; +import org.eclipse.hono.client.CommandClient; +import org.eclipse.hono.client.HonoApplicationClient; +import org.eclipse.hono.client.MessageConsumer; +import org.eclipse.hono.client.RequestResponseClient; +import org.eclipse.hono.config.ClientConfigProperties; +import org.eclipse.hono.connection.ConnectionFactory; +import org.eclipse.hono.util.CommandConstants; +import org.eclipse.hono.util.ResourceIdentifier; + +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.proton.ProtonDelivery; + +/** + * A helper class for creating Vert.x based clients for Hono's APIs. + *

+ * The client ensures that all interactions with the peer are performed on the + * same vert.x {@code Context}. For this purpose the connect methods + * either use the current Context or create a new Context for connecting to + * the peer. This same Context is then used for all consecutive interactions with + * the peer as well, e.g. when creating consumers or senders. + *

+ * Closing or disconnecting the client will release the Context. The next + * invocation of any of the connect methods will then use the same approach as + * described above to determine the Context to use. + */ +public class HonoApplicationClientImpl extends HonoClientImpl implements HonoApplicationClient { + + /** + * Creates a new client for a set of configuration properties. + *

+ * This constructor creates a connection factory using + * {@link ConnectionFactory#newConnectionFactory(Vertx, ClientConfigProperties)}. + * + * @param vertx The Vert.x instance to execute the client on, if {@code null} a new Vert.x instance is used. + * @param clientConfigProperties The configuration properties to use. + * @throws NullPointerException if clientConfigProperties is {@code null} + */ + public HonoApplicationClientImpl(final Vertx vertx, final ClientConfigProperties clientConfigProperties) { + this(vertx, null, clientConfigProperties); + } + + /** + * Creates a new client for a set of configuration properties. + *

+ * NB Make sure to always use the same set of configuration properties for both the connection factory as + * well as the Hono client in order to prevent unexpected behavior. + * + * @param vertx The Vert.x instance to execute the client on, if {@code null} a new Vert.x instance is used. + * @param connectionFactory The factory to use for creating an AMQP connection to the Hono server. + * @param clientConfigProperties The configuration properties to use. + * @throws NullPointerException if clientConfigProperties is {@code null} + */ + public HonoApplicationClientImpl(final Vertx vertx, final ConnectionFactory connectionFactory, + final ClientConfigProperties clientConfigProperties) { + super(vertx, connectionFactory, clientConfigProperties); + } + + /** + * {@inheritDoc} + */ + @Override + public final Future createTelemetryConsumer( + final String tenantId, + final Consumer messageConsumer, + final Handler closeHandler) { + + return createConsumer( + tenantId, + () -> newTelemetryConsumer(tenantId, messageConsumer, closeHandler)); + } + + private Future newTelemetryConsumer( + final String tenantId, + final Consumer messageConsumer, + final Handler closeHandler) { + + return checkConnected().compose(con -> { + final Future result = Future.future(); + TelemetryConsumerImpl.create(context, clientConfigProperties, connection, tenantId, + connectionFactory.getPathSeparator(), messageConsumer, result.completer(), + closeHook -> closeHandler.handle(null)); + return result; + }); + } + + /** + * {@inheritDoc} + */ + @Override + public final Future createEventConsumer( + final String tenantId, + final Consumer eventConsumer, + final Handler closeHandler) { + + return createEventConsumer(tenantId, (delivery, message) -> eventConsumer.accept(message), closeHandler); + } + + /** + * {@inheritDoc} + */ + @Override + public final Future createEventConsumer( + final String tenantId, + final BiConsumer messageConsumer, + final Handler closeHandler) { + + return createConsumer( + tenantId, + () -> newEventConsumer(tenantId, messageConsumer, closeHandler)); + } + + private Future newEventConsumer( + final String tenantId, + final BiConsumer messageConsumer, + final Handler closeHandler) { + + return checkConnected().compose(con -> { + final Future result = Future.future(); + EventConsumerImpl.create(context, clientConfigProperties, connection, tenantId, + connectionFactory.getPathSeparator(), messageConsumer, result.completer(), + closeHook -> closeHandler.handle(null)); + return result; + }); + } + + /** + * {@inheritDoc} + */ + @Override + public Future getOrCreateCommandClient(final String tenantId, final String deviceId) { + return getOrCreateCommandClient(tenantId, deviceId, UUID.randomUUID().toString()); + } + + /** + * {@inheritDoc} + */ + @Override + public Future getOrCreateCommandClient(final String tenantId, final String deviceId, + final String replyId) { + + Objects.requireNonNull(tenantId); + Objects.requireNonNull(deviceId); + Objects.requireNonNull(replyId); + + log.debug("get or create command client for [tenantId: {}, deviceId: {}, replyId: {}]", tenantId, deviceId, + replyId); + return getOrCreateRequestResponseClient( + ResourceIdentifier.from(CommandConstants.COMMAND_ENDPOINT, tenantId, deviceId).toString(), + () -> newCommandClient(tenantId, deviceId, replyId)).map(c -> (CommandClient) c); + } + + private Future newCommandClient(final String tenantId, final String deviceId, + final String replyId) { + return checkConnected().compose(connected -> { + final Future result = Future.future(); + CommandClientImpl.create( + context, + clientConfigProperties, + connection, + tenantId, deviceId, replyId, + this::removeActiveRequestResponseClient, + this::removeActiveRequestResponseClient, + result.completer()); + return result.map(client -> (RequestResponseClient) client); + }); + } +} diff --git a/client/src/main/java/org/eclipse/hono/client/impl/HonoClientImpl.java b/client/src/main/java/org/eclipse/hono/client/impl/HonoClientImpl.java index 2790cf44e8..47b3aaae28 100644 --- a/client/src/main/java/org/eclipse/hono/client/impl/HonoClientImpl.java +++ b/client/src/main/java/org/eclipse/hono/client/impl/HonoClientImpl.java @@ -22,22 +22,17 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.BiConsumer; -import java.util.function.Consumer; import java.util.function.Supplier; import javax.security.sasl.AuthenticationException; import org.apache.qpid.proton.amqp.Symbol; -import org.apache.qpid.proton.message.Message; import org.eclipse.hono.cache.CacheProvider; import org.eclipse.hono.client.ClientErrorException; -import org.eclipse.hono.client.CommandClient; import org.eclipse.hono.client.CredentialsClient; import org.eclipse.hono.client.HonoClient; import org.eclipse.hono.client.MessageConsumer; @@ -49,10 +44,8 @@ import org.eclipse.hono.client.TenantClient; import org.eclipse.hono.config.ClientConfigProperties; import org.eclipse.hono.connection.ConnectionFactory; -import org.eclipse.hono.util.CommandConstants; import org.eclipse.hono.util.Constants; import org.eclipse.hono.util.HonoProtonHelper; -import org.eclipse.hono.util.ResourceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -66,11 +59,10 @@ import io.vertx.core.Vertx; import io.vertx.proton.ProtonClientOptions; import io.vertx.proton.ProtonConnection; -import io.vertx.proton.ProtonDelivery; import io.vertx.proton.sasl.SaslSystemException; /** - * A helper class for creating Vert.x based clients for Hono's arbitrary APIs. + * A helper class for creating Vert.x based clients for Hono's APIs. *

* The client ensures that all interactions with the peer are performed on the * same vert.x {@code Context}. For this purpose the connect methods @@ -87,7 +79,7 @@ public class HonoClientImpl implements HonoClient { /** * A logger to be shared with subclasses. */ - protected final Logger log = LoggerFactory.getLogger(HonoClientImpl.class); + protected final Logger log = LoggerFactory.getLogger(getClass()); /** * The configuration properties for this client. */ @@ -110,6 +102,10 @@ public class HonoClientImpl implements HonoClient { * The vert.x Context to use for interacting with the peer. */ protected volatile Context context; + /** + * The ConnectionFactory to use for creating connections. + */ + protected final ConnectionFactory connectionFactory; private final Map activeRequestResponseClients = new HashMap<>(); private final Map creationLocks = new HashMap<>(); @@ -117,7 +113,6 @@ public class HonoClientImpl implements HonoClient { private final AtomicBoolean connecting = new AtomicBoolean(false); private final AtomicBoolean shuttingDown = new AtomicBoolean(false); private final AtomicBoolean disconnecting = new AtomicBoolean(false); - private final ConnectionFactory connectionFactory; private final Object connectionLock = new Object(); private ProtonClientOptions clientOptions; @@ -689,74 +684,6 @@ private void getOrCreateSender( } } - /** - * {@inheritDoc} - */ - @Override - public final Future createTelemetryConsumer( - final String tenantId, - final Consumer messageConsumer, - final Handler closeHandler) { - - return createConsumer( - tenantId, - () -> newTelemetryConsumer(tenantId, messageConsumer, closeHandler)); - } - - private Future newTelemetryConsumer( - final String tenantId, - final Consumer messageConsumer, - final Handler closeHandler) { - - return checkConnected().compose(con -> { - final Future result = Future.future(); - TelemetryConsumerImpl.create(context, clientConfigProperties, connection, tenantId, - connectionFactory.getPathSeparator(), messageConsumer, result.completer(), - closeHook -> closeHandler.handle(null)); - return result; - }); - } - - /** - * {@inheritDoc} - */ - @Override - public final Future createEventConsumer( - final String tenantId, - final Consumer eventConsumer, - final Handler closeHandler) { - - return createEventConsumer(tenantId, (delivery, message) -> eventConsumer.accept(message), closeHandler); - } - - /** - * {@inheritDoc} - */ - @Override - public final Future createEventConsumer( - final String tenantId, - final BiConsumer messageConsumer, - final Handler closeHandler) { - - return createConsumer( - tenantId, - () -> newEventConsumer(tenantId, messageConsumer, closeHandler)); - } - - private Future newEventConsumer( - final String tenantId, - final BiConsumer messageConsumer, - final Handler closeHandler) { - - return checkConnected().compose(con -> { - final Future result = Future.future(); - EventConsumerImpl.create(context, clientConfigProperties, connection, tenantId, - connectionFactory.getPathSeparator(), messageConsumer, result.completer(), - closeHook -> closeHandler.handle(null)); - return result; - }); - } - /** * Creates a new message consumer for a tenant. * @@ -854,13 +781,21 @@ protected final void removeCredentialsClient(final String tenantId) { removeActiveRequestResponseClient(targetAddress); } - private void removeActiveRequestResponseClient(final String targetAddress) { + /** + * Removes a RequestResponseClient from the list of active clients. + *

+ * Once a client has been removed, the next invocation of the corresponding getOrCreateRequestResponseClient + * method will result in a new client being created (and added to the list of active clients). + * + * @param key The key to look-up the client by. + */ + protected void removeActiveRequestResponseClient(final String key) { - final RequestResponseClient client = activeRequestResponseClients.remove(targetAddress); + final RequestResponseClient client = activeRequestResponseClients.remove(key); if (client != null) { client.close(s -> { }); - log.debug("closed and removed client for [{}]", targetAddress); + log.debug("closed and removed client for [{}]", key); } } @@ -983,48 +918,6 @@ protected void removeTenantClient() { removeActiveRequestResponseClient(targetAddress); } - /** - * {@inheritDoc} - */ - @Override - public Future getOrCreateCommandClient(final String tenantId, final String deviceId) { - return getOrCreateCommandClient(tenantId, deviceId, UUID.randomUUID().toString()); - } - - /** - * {@inheritDoc} - */ - @Override - public Future getOrCreateCommandClient(final String tenantId, final String deviceId, - final String replyId) { - - Objects.requireNonNull(tenantId); - Objects.requireNonNull(deviceId); - Objects.requireNonNull(replyId); - - log.debug("get or create command client for [tenantId: {}, deviceId: {}, replyId: {}]", tenantId, deviceId, - replyId); - return getOrCreateRequestResponseClient( - ResourceIdentifier.from(CommandConstants.COMMAND_ENDPOINT, tenantId, deviceId).toString(), - () -> newCommandClient(tenantId, deviceId, replyId)).map(c -> (CommandClient) c); - } - - private Future newCommandClient(final String tenantId, final String deviceId, - final String replyId) { - return checkConnected().compose(connected -> { - final Future result = Future.future(); - CommandClientImpl.create( - context, - clientConfigProperties, - connection, - tenantId, deviceId, replyId, - this::removeActiveRequestResponseClient, - this::removeActiveRequestResponseClient, - result.completer()); - return result.map(client -> (RequestResponseClient) client); - }); - } - /** * Gets an existing or creates a new request-response client for a particular service. * diff --git a/example/src/main/java/org/eclipse/hono/vertx/example/base/HonoExampleApplicationBase.java b/example/src/main/java/org/eclipse/hono/vertx/example/base/HonoExampleApplicationBase.java index b171132116..d5e3dc7f7c 100644 --- a/example/src/main/java/org/eclipse/hono/vertx/example/base/HonoExampleApplicationBase.java +++ b/example/src/main/java/org/eclipse/hono/vertx/example/base/HonoExampleApplicationBase.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2016, 2018 Contributors to the Eclipse Foundation + * Copyright (c) 2016, 2019 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -21,10 +21,10 @@ import org.apache.qpid.proton.message.Message; import org.eclipse.hono.client.CommandClient; -import org.eclipse.hono.client.HonoClient; +import org.eclipse.hono.client.HonoApplicationClient; import org.eclipse.hono.client.MessageConsumer; import org.eclipse.hono.client.ServiceInvocationException; -import org.eclipse.hono.client.impl.HonoClientImpl; +import org.eclipse.hono.client.impl.HonoApplicationClientImpl; import org.eclipse.hono.config.ClientConfigProperties; import org.eclipse.hono.util.MessageHelper; import org.eclipse.hono.util.MessageTap; @@ -60,7 +60,7 @@ public class HonoExampleApplicationBase { protected final int DEFAULT_CONNECT_TIMEOUT_MILLIS = 1000; private final Vertx vertx = Vertx.vertx(); - private final HonoClient honoClient; + private final HonoApplicationClient honoClient; /** * A map holding a handler to cancel a timer that was started to send commands periodically to a device. @@ -94,7 +94,7 @@ public HonoExampleApplicationBase() { props.setHostnameVerificationRequired(false); } - honoClient = new HonoClientImpl(vertx, props); + honoClient = new HonoApplicationClientImpl(vertx, props); } /** diff --git a/jmeter/src/main/java/org/eclipse/hono/jmeter/client/HonoCommander.java b/jmeter/src/main/java/org/eclipse/hono/jmeter/client/HonoCommander.java index 8c30b11471..a46543a938 100644 --- a/jmeter/src/main/java/org/eclipse/hono/jmeter/client/HonoCommander.java +++ b/jmeter/src/main/java/org/eclipse/hono/jmeter/client/HonoCommander.java @@ -1,18 +1,15 @@ -/* - * ****************************************************************************** - * * Copyright (c) 2016, 2018 Contributors to the Eclipse Foundation - * * - * * See the NOTICE file(s) distributed with this work for additional - * * information regarding copyright ownership. - * * - * * This program and the accompanying materials are made available under the - * * terms of the Eclipse Public License 2.0 which is available at - * * http://www.eclipse.org/legal/epl-2.0 - * * - * * SPDX-License-Identifier: EPL-2.0 - * ****************************************************************************** +/******************************************************************************* + * Copyright (c) 2016, 2019 Contributors to the Eclipse Foundation * - */ + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + *******************************************************************************/ package org.eclipse.hono.jmeter.client; @@ -29,9 +26,10 @@ import org.apache.qpid.proton.amqp.messaging.Data; import org.apache.qpid.proton.message.Message; import org.eclipse.hono.client.CommandClient; +import org.eclipse.hono.client.HonoApplicationClient; import org.eclipse.hono.client.HonoClient; import org.eclipse.hono.client.MessageConsumer; -import org.eclipse.hono.client.impl.HonoClientImpl; +import org.eclipse.hono.client.impl.HonoApplicationClientImpl; import org.eclipse.hono.config.ClientConfigProperties; import org.eclipse.hono.jmeter.HonoCommanderSampler; import org.eclipse.hono.jmeter.sampleresult.HonoCommanderSampleResult; @@ -48,7 +46,7 @@ public class HonoCommander extends AbstractClient { private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(HonoCommander.class); - private final HonoClient client; + private final HonoApplicationClient client; private final HonoCommanderSampler sampler; private final List devicesReadyToReceiveCommands = new CopyOnWriteArrayList<>(); private final AtomicInteger successCount = new AtomicInteger(0); @@ -81,7 +79,7 @@ public HonoCommander(final HonoCommanderSampler sampler) { tenant = sampler.getTenant(); commandTimeoutInMs = sampler.getCommandTimeoutAsInt(); triggerType = sampler.getTriggerType(); - client = new HonoClientImpl(vertx, clientConfig); + client = new HonoApplicationClientImpl(vertx, clientConfig); } /** diff --git a/jmeter/src/main/java/org/eclipse/hono/jmeter/client/HonoReceiver.java b/jmeter/src/main/java/org/eclipse/hono/jmeter/client/HonoReceiver.java index 8c56b00d37..dc55c80262 100644 --- a/jmeter/src/main/java/org/eclipse/hono/jmeter/client/HonoReceiver.java +++ b/jmeter/src/main/java/org/eclipse/hono/jmeter/client/HonoReceiver.java @@ -19,9 +19,10 @@ import org.apache.jmeter.samplers.SampleResult; import org.apache.qpid.proton.message.Message; +import org.eclipse.hono.client.HonoApplicationClient; import org.eclipse.hono.client.HonoClient; import org.eclipse.hono.client.MessageConsumer; -import org.eclipse.hono.client.impl.HonoClientImpl; +import org.eclipse.hono.client.impl.HonoApplicationClientImpl; import org.eclipse.hono.config.ClientConfigProperties; import org.eclipse.hono.jmeter.HonoReceiverSampler; import org.eclipse.hono.jmeter.HonoSampler; @@ -41,7 +42,7 @@ public class HonoReceiver extends AbstractClient { private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(HonoReceiver.class); - private final HonoClient amqpNetworkClient; + private final HonoApplicationClient amqpNetworkClient; private final HonoReceiverSampler sampler; private final transient Object lock = new Object(); @@ -78,7 +79,7 @@ public HonoReceiver(final HonoReceiverSampler sampler) { clientConfig.setInitialCredits(Integer.parseInt(sampler.getPrefetch())); clientConfig.setReconnectAttempts(Integer.parseInt(sampler.getReconnectAttempts())); // amqp network config - amqpNetworkClient = new HonoClientImpl(vertx, clientConfig); + amqpNetworkClient = new HonoApplicationClientImpl(vertx, clientConfig); } /** diff --git a/tests/src/test/java/org/eclipse/hono/tests/IntegrationTestHonoClient.java b/tests/src/test/java/org/eclipse/hono/tests/IntegrationTestHonoClient.java index acba3f6d21..eec2aa6880 100644 --- a/tests/src/test/java/org/eclipse/hono/tests/IntegrationTestHonoClient.java +++ b/tests/src/test/java/org/eclipse/hono/tests/IntegrationTestHonoClient.java @@ -1,5 +1,5 @@ -/** - * Copyright (c) 2018 Contributors to the Eclipse Foundation +/******************************************************************************* + * Copyright (c) 2016, 2019 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -9,14 +9,13 @@ * http://www.eclipse.org/legal/epl-2.0 * * SPDX-License-Identifier: EPL-2.0 - */ - + *******************************************************************************/ package org.eclipse.hono.tests; import java.util.Objects; -import org.eclipse.hono.client.impl.HonoClientImpl; +import org.eclipse.hono.client.impl.HonoApplicationClientImpl; import org.eclipse.hono.config.ClientConfigProperties; import io.vertx.core.Future; @@ -27,7 +26,7 @@ * A Hono client that also allows to create generic links to a peer. * */ -public class IntegrationTestHonoClient extends HonoClientImpl { +public class IntegrationTestHonoClient extends HonoApplicationClientImpl { /** * Creates a new client.