Skip to content

Commit

Permalink
[#565] Implement protocol connection events
Browse files Browse the repository at this point in the history
This change adds connection events which protocol adapters may send out
in order to notify listeners that a device has connected/disconnected.

Those events are "best effort", but there are no guarantees about them.
Hono supports different ways of handling those events and comes with two
default implementations. One "logging" implementation which simply logs
to the logging system. And one event base implementation, which sends
out events using the *Hono Event API*. The implementations may be wired
up using the Spring Boot configuration.
  • Loading branch information
ctron committed May 4, 2018
1 parent abd0d9b commit f0205f9
Show file tree
Hide file tree
Showing 9 changed files with 476 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@

import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.mqtt.MqttAuth;
import io.vertx.mqtt.MqttConnectionException;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttServer;
import io.vertx.mqtt.MqttServerOptions;
Expand Down Expand Up @@ -94,12 +96,12 @@ public int getInsecurePortDefaultValue() {

@Override
protected final int getActualPort() {
return (server != null ? server.actualPort() : Constants.PORT_UNCONFIGURED);
return server != null ? server.actualPort() : Constants.PORT_UNCONFIGURED;
}

@Override
protected final int getActualInsecurePort() {
return (insecureServer != null ? insecureServer.actualPort() : Constants.PORT_UNCONFIGURED);
return insecureServer != null ? insecureServer.actualPort() : Constants.PORT_UNCONFIGURED;
}

/**
Expand Down Expand Up @@ -187,7 +189,7 @@ private Future<Void> bindInsecureMqttServer() {
private Future<MqttServer> bindMqttServer(final MqttServerOptions options, final MqttServer mqttServer) {

final Future<MqttServer> result = Future.future();
final MqttServer createdMqttServer = (mqttServer == null ? MqttServer.create(this.vertx, options) : mqttServer);
final MqttServer createdMqttServer = mqttServer == null ? MqttServer.create(this.vertx, options) : mqttServer;

createdMqttServer
.endpointHandler(this::handleEndpointConnection)
Expand Down Expand Up @@ -245,6 +247,36 @@ public void doStop(final Future<Void> stopFuture) {
.compose(d -> stopFuture.complete(), stopFuture);
}

/**
* Create a future indicating a rejected connection.
*
* @param returnCode The error code to return.
* @return A future indicating a rejected connection.
*/
protected static <T> Future<T> rejected(final MqttConnectReturnCode returnCode) {
return Future.failedFuture(new MqttConnectionException(
returnCode != null ? returnCode : MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE));
}

/**
* Create a future indicating an accepted connection with an authenticated device.
*
* @param authenticatedDevice The device that was authenticated, may be {@code null}
* @return A future indicating an accepted connection.
*/
protected static Future<Device> accepted(final Device authenticatedDevice) {
return Future.succeededFuture(authenticatedDevice);
}

/**
* Create a future indicating an accepted connection without an authenticated device.
*
* @return A future indicating an accepted connection.
*/
protected static Future<Device> accepted() {
return Future.succeededFuture(null);
}

/**
* Invoked when a client sends its <em>CONNECT</em> packet.
* <p>
Expand All @@ -256,65 +288,104 @@ final void handleEndpointConnection(final MqttEndpoint endpoint) {

LOG.debug("connection request from client [clientId: {}]", endpoint.clientIdentifier());

isConnected().map(ok -> {
if (getConfig().isAuthenticationRequired()) {
handleEndpointConnectionWithAuthentication(endpoint);
isConnected()
.compose(v -> handleConnectionRequest(endpoint))
.setHandler(result -> handleConnectionRequestResult(endpoint, result));

}

private Future<Device> handleConnectionRequest(final MqttEndpoint endpoint) {
if (getConfig().isAuthenticationRequired()) {
return handleEndpointConnectionWithAuthentication(endpoint);
} else {
return handleEndpointConnectionWithoutAuthentication(endpoint);
}
}

private void handleConnectionRequestResult(final MqttEndpoint endpoint,
final AsyncResult<Device> authenticationAttempt) {

if (authenticationAttempt.succeeded()) {

sendConnectedEvent(endpoint.clientIdentifier(), authenticationAttempt.result())
.setHandler(sendAttempt -> {
if (sendAttempt.succeeded()) {
endpoint.accept(false);
} else {
LOG.warn(
"connection request from client [clientId: {}] rejected due to connection event "
+ "failure: {}",
endpoint.clientIdentifier(),
MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE,
sendAttempt.cause());
endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
}
});

} else {

final Throwable t = authenticationAttempt.cause();
if (t instanceof MqttConnectionException) {
final MqttConnectReturnCode code = ((MqttConnectionException) t).code();
LOG.debug("connection request from client [clientId: {}] rejected with code: {}",
endpoint.clientIdentifier(), code);
endpoint.reject(code);
} else {
handleEndpointConnectionWithoutAuthentication(endpoint);
LOG.debug("connection request from client [clientId: {}] rejected: {}",
endpoint.clientIdentifier(), MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
}
return null;
}).otherwise(t -> {
LOG.debug("connection request from client [clientId: {}] rejected: {}",
endpoint.clientIdentifier(), MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
return null;
});

}
}

/**
* Invoked when a client sends its <em>CONNECT</em> packet and client authentication has been disabled by setting
* the {@linkplain ProtocolAdapterProperties#isAuthenticationRequired() authentication required} configuration
* property to {@code false}.
* <p>
* Registers a close handler on the endpoint which invokes {@link #close(MqttEndpoint)}. Registers a publish handler
* on the endpoint which invokes {@link #onPublishedMessage(MqttContext)} for each message being published by the
* client. Accepts the connection request.
* Registers a close handler on the endpoint which invokes {@link #close(MqttEndpoint, Device)}. Registers a publish
* handler on the endpoint which invokes {@link #onPublishedMessage(MqttContext)} for each message being published
* by the client. Accepts the connection request.
*
* @param endpoint The MQTT endpoint representing the client.
*/
private void handleEndpointConnectionWithoutAuthentication(final MqttEndpoint endpoint) {
private Future<Device> handleEndpointConnectionWithoutAuthentication(final MqttEndpoint endpoint) {

endpoint.closeHandler(v -> {
close(endpoint);
close(endpoint, null);
LOG.debug("connection to unauthenticated device [clientId: {}] closed", endpoint.clientIdentifier());
metrics.decrementUnauthenticatedMqttConnections();
});
endpoint.publishHandler(message -> onPublishedMessage(new MqttContext(message, endpoint)));

LOG.debug("unauthenticated device [clientId: {}] connected", endpoint.clientIdentifier());
endpoint.accept(false);
metrics.incrementUnauthenticatedMqttConnections();
return accepted();
}

private void handleEndpointConnectionWithAuthentication(final MqttEndpoint endpoint) {
private Future<Device> handleEndpointConnectionWithAuthentication(final MqttEndpoint endpoint) {

if (endpoint.auth() == null) {

LOG.debug("connection request from device [clientId: {}] rejected: {}",
endpoint.clientIdentifier(), "device did not provide credentials in CONNECT packet");
endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);

return rejected(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);

} else {

final DeviceCredentials credentials = getCredentials(endpoint.auth());

if (credentials == null) {

LOG.debug("connection request from device [clientId: {}] rejected: {}",
endpoint.clientIdentifier(), "device provided malformed credentials in CONNECT packet");
endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
return rejected(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);

} else {

getTenantConfiguration(credentials.getTenantId()).compose(tenantConfig -> {
return getTenantConfiguration(credentials.getTenantId()).compose(tenantConfig -> {
if (tenantConfig.isAdapterEnabled(getTypeName())) {
LOG.debug("protocol adapter [{}] is enabled for tenant [{}]",
getTypeName(), credentials.getTenantId());
Expand All @@ -329,39 +400,38 @@ private void handleEndpointConnectionWithAuthentication(final MqttEndpoint endpo
final Future<Device> result = Future.future();
usernamePasswordAuthProvider.authenticate(credentials, result.completer());
return result;
}).map(authenticatedDevice -> {
}).compose(authenticatedDevice -> {
LOG.debug("successfully authenticated device [tenant-id: {}, auth-id: {}, device-id: {}]",
authenticatedDevice.getTenantId(), credentials.getAuthId(),
authenticatedDevice.getDeviceId());
onAuthenticationSuccess(endpoint, authenticatedDevice);
return null;
}).otherwise(t -> {
return accepted(authenticatedDevice);
}).recover(t -> {
LOG.debug("cannot authenticate device [tenant-id: {}, auth-id: {}]",
credentials.getTenantId(), credentials.getAuthId(), t);
if (ServerErrorException.class.isInstance(t)) {
if (t instanceof ServerErrorException) {
// one of the services we depend on might not be available (yet)
endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
return rejected(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
} else {
// validation of credentials has failed
endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
return rejected(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
}
return null;
});

}
}
}

private void onAuthenticationSuccess(final MqttEndpoint endpoint, final Device authenticatedDevice) {

endpoint.closeHandler(v -> {
close(endpoint);
close(endpoint, authenticatedDevice);
LOG.debug("connection to device [tenant-id: {}, device-id: {}] closed",
authenticatedDevice.getTenantId(), authenticatedDevice.getDeviceId());
metrics.decrementMqttConnections(authenticatedDevice.getTenantId());
});

endpoint.publishHandler(message -> onPublishedMessage(new MqttContext(message, endpoint, authenticatedDevice)));
endpoint.accept(false);
metrics.incrementMqttConnections(authenticatedDevice.getTenantId());
}

Expand Down Expand Up @@ -535,9 +605,11 @@ private Future<Void> uploadMessage(final MqttContext ctx, final String tenant, f
* Closes a connection to a client.
*
* @param endpoint The connection to close.
* @param authenticatedDevice Optional authenticated device information, may be {@code null}.
*/
protected final void close(final MqttEndpoint endpoint) {
protected final void close(final MqttEndpoint endpoint, final Device authenticatedDevice) {
onClose(endpoint);
sendDisconnectedEvent(endpoint.clientIdentifier(), authenticatedDevice);
if (endpoint.isConnected()) {
LOG.debug("closing connection with client [client ID: {}]", endpoint.clientIdentifier());
endpoint.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import org.eclipse.hono.config.ClientConfigProperties;
import org.eclipse.hono.config.ProtocolAdapterProperties;
import org.eclipse.hono.service.AbstractAdapterConfig;
import org.eclipse.hono.service.monitoring.ConnectionEventProducer;
import org.eclipse.hono.service.monitoring.LoggingConnectionEventProducer;
import org.springframework.beans.factory.config.ObjectFactoryCreatingFactoryBean;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
Expand All @@ -40,7 +42,7 @@ public class Config extends AbstractAdapterConfig {
*/
@Bean(name = BEAN_NAME_VERTX_BASED_MQTT_PROTOCOL_ADAPTER)
@Scope("prototype")
public VertxBasedMqttProtocolAdapter vertxBasedMqttProtocolAdapter(){
public VertxBasedMqttProtocolAdapter vertxBasedMqttProtocolAdapter() {
return new VertxBasedMqttProtocolAdapter();
}

Expand Down Expand Up @@ -72,7 +74,7 @@ protected void customizeCredentialsServiceClientConfig(final RequestResponseClie
*/
@Bean
@ConfigurationProperties(prefix = "hono.app")
public ApplicationConfigProperties applicationConfigProperties(){
public ApplicationConfigProperties applicationConfigProperties() {
return new ApplicationConfigProperties();
}

Expand All @@ -98,4 +100,16 @@ public ObjectFactoryCreatingFactoryBean serviceFactory() {
factory.setTargetBeanName(BEAN_NAME_VERTX_BASED_MQTT_PROTOCOL_ADAPTER);
return factory;
}

/**
* Exposes the connection event producer implementation.
* <p>
* This defaults to a {@link LoggingConnectionEventProducer} which logs to the default level.
*
* @return The connection event producer.
*/
@Bean
public ConnectionEventProducer connectionEventProducer() {
return new LoggingConnectionEventProducer();
}
}
8 changes: 7 additions & 1 deletion core/src/main/java/org/eclipse/hono/util/EventConstants.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2016 Bosch Software Innovations GmbH.
* Copyright (c) 2016, 2018 Bosch Software Innovations GmbH and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
Expand All @@ -8,6 +8,7 @@
*
* Contributors:
* Bosch Software Innovations GmbH - initial creation
* Red Hat Inc
*
*/
package org.eclipse.hono.util;
Expand All @@ -27,6 +28,11 @@ public final class EventConstants {
*/
public static final String EVENT_ENDPOINT_SHORT = "e";

/**
* The content type of the <em>connection notification</em> event.
*/
public static final String EVENT_CONNECTION_NOTIFICATION_CONTENT_TYPE = "application/vnd.eclipse-hono-dc-notification+json";

private EventConstants() {
}
}
Loading

0 comments on commit f0205f9

Please sign in to comment.