Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#780] Extract methods out of HonoClient into new HonoApplicationClient #1085

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cli/src/main/java/org/eclipse/hono/cli/AbstractClient.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2016, 2018 Contributors to the Eclipse Foundation
* Copyright (c) 2016, 2019 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand All @@ -15,7 +15,7 @@

import java.util.Objects;

import org.eclipse.hono.client.HonoClient;
import org.eclipse.hono.client.HonoApplicationClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;

Expand All @@ -31,10 +31,10 @@ abstract class AbstractClient extends AbstractCliClient {
protected String deviceId;
@Value(value = "${connection.retryInterval}")
protected int connectionRetryInterval;
protected HonoClient client;
protected HonoApplicationClient client;

@Autowired
public final void setHonoClient(final HonoClient client) {
public final void setHonoClient(final HonoApplicationClient client) {
this.client = Objects.requireNonNull(client);
}
}
12 changes: 6 additions & 6 deletions cli/src/main/java/org/eclipse/hono/cli/AppConfiguration.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2016, 2018 Contributors to the Eclipse Foundation
* Copyright (c) 2016, 2019 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand All @@ -13,8 +13,8 @@

package org.eclipse.hono.cli;

import org.eclipse.hono.client.HonoClient;
import org.eclipse.hono.client.impl.HonoClientImpl;
import org.eclipse.hono.client.HonoApplicationClient;
import org.eclipse.hono.client.impl.HonoApplicationClientImpl;
import org.eclipse.hono.config.ClientConfigProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
Expand Down Expand Up @@ -68,12 +68,12 @@ public ClientConfigProperties honoClientConfig() {
}

/**
* Exposes a {@code HonoClient} as a Spring bean.
* Exposes a {@code HonoApplicationClient} as a Spring bean.
*
* @return The Hono client.
*/
@Bean
public HonoClient honoClient() {
return new HonoClientImpl(vertx(), honoClientConfig());
public HonoApplicationClient honoClient() {
return new HonoApplicationClientImpl(vertx(), honoClientConfig());
}
}
15 changes: 7 additions & 8 deletions cli/src/main/java/org/eclipse/hono/cli/Receiver.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2016, 2018 Contributors to the Eclipse Foundation
* Copyright (c) 2016, 2019 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand All @@ -20,7 +20,6 @@
import io.vertx.core.buffer.Buffer;

import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.HonoClient;
import org.eclipse.hono.util.MessageHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Profile;
Expand Down Expand Up @@ -62,28 +61,28 @@ public class Receiver extends AbstractClient {
@PostConstruct
Future<CompositeFuture> start() {
return client.connect(this::onDisconnect)
.compose(this::createConsumer)
.compose(res -> createConsumer())
.setHandler(this::handleCreateConsumerStatus);
}

private CompositeFuture createConsumer(final HonoClient connectedClient) {
private CompositeFuture createConsumer() {
final Handler<Void> closeHandler = closeHook -> {
LOG.info("close handler of consumer is called");
vertx.setTimer(connectionRetryInterval, reconnect -> {
LOG.info("attempting to re-open the consumer link ...");
createConsumer(connectedClient);
createConsumer();
});
};

@SuppressWarnings("rawtypes")
final List<Future> consumerFutures = new ArrayList<>();
if (messageType.equals(TYPE_EVENT) || messageType.equals(TYPE_ALL)) {
consumerFutures.add(
connectedClient.createEventConsumer(tenantId, msg -> handleMessage(TYPE_EVENT, msg), closeHandler));
client.createEventConsumer(tenantId, msg -> handleMessage(TYPE_EVENT, msg), closeHandler));
}

if (messageType.equals(TYPE_TELEMETRY) || messageType.equals(TYPE_ALL)) {
consumerFutures.add(connectedClient
consumerFutures.add(client
.createTelemetryConsumer(tenantId, msg -> handleMessage(TYPE_TELEMETRY, msg), closeHandler));
}

Expand All @@ -101,7 +100,7 @@ private void onDisconnect(final ProtonConnection con) {
vertx.setTimer(connectionRetryInterval, reconnect -> {
LOG.info("attempting to re-connect to Hono ...");
client.connect(this::onDisconnect)
.compose(this::createConsumer);
.compose(res -> createConsumer());
});
}

Expand Down
6 changes: 3 additions & 3 deletions cli/src/test/java/org/eclipse/hono/cli/ReceiverTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2016, 2018 Contributors to the Eclipse Foundation
* Copyright (c) 2016, 2019 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand All @@ -21,7 +21,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import org.eclipse.hono.client.HonoClient;
import org.eclipse.hono.client.HonoApplicationClient;
import org.eclipse.hono.client.MessageConsumer;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -63,7 +63,7 @@ public void setup() {
final Vertx vertx = mock(Vertx.class);
when(vertx.getOrCreateContext()).thenReturn(mock(Context.class));

final HonoClient client = mock(HonoClient.class);
final HonoApplicationClient client = mock(HonoApplicationClient.class);
when(client.connect(any(Handler.class))).thenReturn(Future.succeededFuture(client));
when(client.connect()).thenReturn(Future.succeededFuture(client));
when(client.createTelemetryConsumer(anyString(), any(Consumer.class), any(Handler.class)))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*******************************************************************************
* Copyright (c) 2016, 2019 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*******************************************************************************/

package org.eclipse.hono.client;

import java.util.function.BiConsumer;
import java.util.function.Consumer;

import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.config.ClientConfigProperties;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.proton.ProtonDelivery;

/**
* A factory for creating clients for Hono's APIs.
* <p>
* A factory maintains a single AMQP 1.0 connection and a single session to the peer. This session is shared by all AMQP
* 1.0 links established for <em>senders</em>, <em>consumers</em> and <em>clients</em> created using the corresponding
* factory methods.
* <p>
* The <em>getOrCreate</em> factory methods return an existing client for the given address if available. Note that
* factory methods for creating consumers <em>always</em> return a new instance so that all messages received are only
* processed by the handler passed in to the factory method.
* <p>
* Before any of the factory methods can be invoked successfully, the client needs to connect to Hono. This is done by
* invoking one of the client's <em>connect</em> methods.
* <p>
* An AMQP connection is established in multiple stages:
* <ol>
* <li>The client establishes a TCP connection to the peer. For this to succeed, the peer must have registered a
* socket listener on the IP address and port that the client is configured to use.</li>
* <li>The client performs a SASL handshake with the peer if required by the peer. The client needs to be
* configured with correct credentials in order for this stage to succeed.</li>
* <li>Finally, the client and the peer need to agree on AMQP 1.0 specific connection parameters like capabilities
* and session window size.</li>
* </ol>
* Some of the <em>connect</em> methods accept a {@code ProtonClientOptions} type parameter. Note that these options
* only influence the client's behavior when establishing the TCP connection with the peer. The overall behavior of
* the client regarding the establishment of the AMQP connection must be configured using the
* {@link ClientConfigProperties} passed in to the <em>newClient</em> method. In particular, the
* {@link ClientConfigProperties#setReconnectAttempts(int)} method can be used to specify, how many
* times the client should try to establish a connection to the peer before giving up.
* <p>
* <em>NB</em> When the client tries to establish a connection to the peer, it stores the <em>current</em>
* vert.x {@code Context} in a local variable and performs all further interactions with the peer running
* on this Context. Invoking any of the methods of the client from a vert.x Context other than the one
* used for establishing the connection may cause race conditions or even deadlocks because the handlers
* registered on the {@code Future}s returned by these methods will be invoked from the stored Context.
* It is the invoking code's responsibility to either ensure that the client's methods are always invoked
* from the same Context or to make sure that the handlers are running on the correct Context, e.g. by using
* the {@code Context}'s <em>runOnContext</em> method.
*/
public interface HonoApplicationClient extends HonoClient {

/**
* Creates a client for consuming data from Hono's north bound <em>Telemetry API</em>.
*
* @param tenantId The tenant to consume data for.
* @param telemetryConsumer The handler to invoke with every message received.
* @param closeHandler The handler invoked when the peer detaches the link.
* @return A future that will complete with the consumer once the link has been established. The future will fail if
* the link cannot be established, e.g. because this client is not connected.
* @throws NullPointerException if any of the parameters is {@code null}.
*/
Future<MessageConsumer> createTelemetryConsumer(String tenantId, Consumer<Message> telemetryConsumer,
Handler<Void> closeHandler);

/**
* Creates a client for consuming events from Hono's north bound <em>Event API</em>.
* <p>
* The events passed in to the event consumer will be settled automatically if the consumer does not throw an
* exception.
*
* @param tenantId The tenant to consume events for.
* @param eventConsumer The handler to invoke with every event received.
* @param closeHandler The handler invoked when the peer detaches the link.
* @return A future that will complete with the consumer once the link has been established. The future will fail if
* the link cannot be established, e.g. because this client is not connected.
* @throws NullPointerException if any of the parameters is {@code null}.
*/
Future<MessageConsumer> createEventConsumer(String tenantId, Consumer<Message> eventConsumer,
Handler<Void> closeHandler);

/**
* Creates a client for consuming events from Hono's north bound <em>Event API</em>.
* <p>
* The events passed in to the event consumer will be settled automatically if the consumer does not throw an
* exception and does not manually handle the message disposition using the passed in delivery.
*
* @param tenantId The tenant to consume events for.
* @param eventConsumer The handler to invoke with every event received.
* @param closeHandler The handler invoked when the peer detaches the link.
* @return A future that will complete with the consumer once the link has been established. The future will fail if
* the link cannot be established, e.g. because this client is not connected.
* @throws NullPointerException if any of the parameters is {@code null}.
*/
Future<MessageConsumer> createEventConsumer(String tenantId, BiConsumer<ProtonDelivery, Message> eventConsumer,
Handler<Void> closeHandler);

/**
* Gets a client for sending commands to a device.
* <p>
* The client returned may be either newly created or it may be an existing
* client for the given device.
* <p>
* This method will use an implementation specific mechanism (e.g. a UUID) to create
* a unique reply-to address to be included in commands sent to the device. The protocol
* adapters need to convey an encoding of the reply-to address to the device when delivering
* the command. Consequently, the number of bytes transferred to the device depends on the
* length of the reply-to address being used. In situations where this is a major concern it
* might be advisable to use {@link #getOrCreateCommandClient(String, String, String)} for
* creating a command client and provide custom (and shorter) <em>reply-to identifier</em>
* to be used in the reply-to address.
*
* @param tenantId The tenant that the device belongs to.
* @param deviceId The device to send the commands to.
* @return A future that will complete with the command and control client (if successful) or
* fail if the client cannot be created, e.g. because the underlying connection
* is not established or if a concurrent request to create a client for the same
* tenant and device is already being executed.
* @throws NullPointerException if the tenantId is {@code null}.
*/
Future<CommandClient> getOrCreateCommandClient(String tenantId, String deviceId);

/**
* Gets a client for sending commands to a device.
* <p>
* The client returned may be either newly created or it may be an existing
* client for the given device.
*
* @param tenantId The tenant that the device belongs to.
* @param deviceId The device to send the commands to.
* @param replyId An arbitrary string which (in conjunction with the tenant and device ID) uniquely
* identifies this command client.
* This identifier will only be used for creating a <em>new</em> client for the device.
* If this method returns an existing client for the device then the client will use
* the reply-to address determined during its initial creation. In particular, this
* means that if the (existing) client has originally been created using the
* {@link #getOrCreateCommandClient(String, String)} method, then the reply-to address
* used by the client will most likely not contain the given identifier.
* @return A future that will complete with the command and control client (if successful) or
* fail if the client cannot be created, e.g. because the underlying connection
* is not established or if a concurrent request to create a client for the same
* tenant and device is already being executed.
* @throws NullPointerException if the tenantId is {@code null}.
*/
Future<CommandClient> getOrCreateCommandClient(String tenantId, String deviceId, String replyId);

}
Loading