Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[eclipse-hono#565] Implement protocol connection events
Browse files Browse the repository at this point in the history
ctron committed Apr 11, 2018
1 parent 368a35f commit 03c8982
Showing 6 changed files with 316 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -13,6 +13,10 @@

package org.eclipse.hono.adapter.mqtt;

import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE;

import java.net.HttpURLConnection;
import java.util.Objects;

@@ -40,11 +44,13 @@

import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.mqtt.MqttAuth;
import io.vertx.mqtt.MqttConnectionException;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttServer;
import io.vertx.mqtt.MqttServerOptions;
@@ -245,6 +251,35 @@ public void doStop(final Future<Void> stopFuture) {
.compose(d -> stopFuture.complete(), stopFuture);
}

/**
* Create a future indicating a rejected connection.
*
* @param returnCode The error code to return.
* @return A future indicating a rejected connection.
*/
protected static <T> Future<T> rejected ( final MqttConnectReturnCode returnCode ) {
return Future.failedFuture(new MqttConnectionException(returnCode != null ? returnCode : MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE ));
}

/**
* Create a future indicating an accepted connection with an authenticated device.
*
* @param authenticatedDevice The device that was authenticated, may be {@code null}
* @return A future indicating an accepted connection.
*/
protected static Future<Device> accepted ( final Device authenticatedDevice ) {
return Future.succeededFuture(authenticatedDevice);
}

/**
* Create a future indicating an accepted connection without an authenticated device.
*
* @return A future indicating an accepted connection.
*/
protected static Future<Device> accepted () {
return Future.succeededFuture(null);
}

/**
* Invoked when a client sends its <em>CONNECT</em> packet.
* <p>
@@ -256,19 +291,42 @@ final void handleEndpointConnection(final MqttEndpoint endpoint) {

LOG.debug("connection request from client [clientId: {}]", endpoint.clientIdentifier());

isConnected().map(ok -> {
if (getConfig().isAuthenticationRequired()) {
handleEndpointConnectionWithAuthentication(endpoint);
isConnected()
.compose( v -> handleConnectionRequest(endpoint))
.setHandler(result -> handleConnectionRequestResult ( endpoint, result ));

}

private Future<Device> handleConnectionRequest(final MqttEndpoint endpoint) {
if (getConfig().isAuthenticationRequired()) {
return handleEndpointConnectionWithAuthentication(endpoint);
} else {
return handleEndpointConnectionWithoutAuthentication(endpoint);
}
}

private void handleConnectionRequestResult ( final MqttEndpoint endpoint, final AsyncResult<Device> done) {

if ( done.succeeded() ) {

endpoint.accept(false);
sendConnectedEvent(endpoint.clientIdentifier(), done.result());

} else {

final Throwable t = done.cause();
if ( t instanceof MqttConnectionException ) {
final MqttConnectReturnCode code = ((MqttConnectionException) t).code();
LOG.debug("connection request from client [clientId: {}] rejected with code: {}",
endpoint.clientIdentifier(), code);
endpoint.reject(code);
} else {
handleEndpointConnectionWithoutAuthentication(endpoint);
LOG.debug("connection request from client [clientId: {}] rejected: {}",
endpoint.clientIdentifier(), MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
}
return null;
}).otherwise(t -> {
LOG.debug("connection request from client [clientId: {}] rejected: {}",
endpoint.clientIdentifier(), MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
return null;
});

}
}

/**
@@ -282,39 +340,42 @@ final void handleEndpointConnection(final MqttEndpoint endpoint) {
*
* @param endpoint The MQTT endpoint representing the client.
*/
private void handleEndpointConnectionWithoutAuthentication(final MqttEndpoint endpoint) {
private Future<Device> handleEndpointConnectionWithoutAuthentication(final MqttEndpoint endpoint) {

endpoint.closeHandler(v -> {
close(endpoint);
close(endpoint, null);
LOG.debug("connection to unauthenticated device [clientId: {}] closed", endpoint.clientIdentifier());
metrics.decrementUnauthenticatedMqttConnections();
});
endpoint.publishHandler(message -> onPublishedMessage(new MqttContext(message, endpoint)));

LOG.debug("unauthenticated device [clientId: {}] connected", endpoint.clientIdentifier());
endpoint.accept(false);
metrics.incrementUnauthenticatedMqttConnections();
return accepted();
}

private void handleEndpointConnectionWithAuthentication(final MqttEndpoint endpoint) {
private Future<Device> handleEndpointConnectionWithAuthentication(final MqttEndpoint endpoint) {

if (endpoint.auth() == null) {

LOG.debug("connection request from device [clientId: {}] rejected: {}",
endpoint.clientIdentifier(), "device did not provide credentials in CONNECT packet");
endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);

return rejected(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);

} else {

final DeviceCredentials credentials = getCredentials(endpoint.auth());

if (credentials == null) {

LOG.debug("connection request from device [clientId: {}] rejected: {}",
endpoint.clientIdentifier(), "device provided malformed credentials in CONNECT packet");
endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
return rejected(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);

} else {

getTenantConfiguration(credentials.getTenantId()).compose(tenantConfig -> {
return getTenantConfiguration(credentials.getTenantId()).compose(tenantConfig -> {
if (tenantConfig.isAdapterEnabled(getTypeName())) {
LOG.debug("protocol adapter [{}] is enabled for tenant [{}]",
getTypeName(), credentials.getTenantId());
@@ -329,39 +390,39 @@ private void handleEndpointConnectionWithAuthentication(final MqttEndpoint endpo
final Future<Device> result = Future.future();
usernamePasswordAuthProvider.authenticate(credentials, result.completer());
return result;
}).map(authenticatedDevice -> {
}).compose(authenticatedDevice -> {
LOG.debug("successfully authenticated device [tenant-id: {}, auth-id: {}, device-id: {}]",
authenticatedDevice.getTenantId(), credentials.getAuthId(),
authenticatedDevice.getDeviceId());
onAuthenticationSuccess(endpoint, authenticatedDevice);
return null;
}).otherwise(t -> {
return accepted ( authenticatedDevice );
}).recover(t -> {
LOG.debug("cannot authenticate device [tenant-id: {}, auth-id: {}]",
credentials.getTenantId(), credentials.getAuthId(), t);
if (ServerErrorException.class.isInstance(t)) {
if (t instanceof ServerErrorException) {
// one of the services we depend on might not be available (yet)
endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
return rejected(CONNECTION_REFUSED_SERVER_UNAVAILABLE);
} else {
// validation of credentials has failed
endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
return rejected(CONNECTION_REFUSED_NOT_AUTHORIZED);
}
return null;
});

}
}
}

private void onAuthenticationSuccess(final MqttEndpoint endpoint, final Device authenticatedDevice) {

endpoint.closeHandler(v -> {
close(endpoint);
close(endpoint, authenticatedDevice);
LOG.debug("connection to device [tenant-id: {}, device-id: {}] closed",
authenticatedDevice.getTenantId(), authenticatedDevice.getDeviceId());
metrics.decrementMqttConnections(authenticatedDevice.getTenantId());
sendDisconnectedEvent(endpoint.clientIdentifier(), authenticatedDevice);
});

endpoint.publishHandler(message -> onPublishedMessage(new MqttContext(message, endpoint, authenticatedDevice)));
endpoint.accept(false);
metrics.incrementMqttConnections(authenticatedDevice.getTenantId());
}

@@ -527,9 +588,11 @@ private Future<Void> uploadMessage(final MqttContext ctx, final String tenant, f
* Closes a connection to a client.
*
* @param endpoint The connection to close.
* @param authenticatedDevice Optional authenticated device information, may be {@code null}.
*/
protected final void close(final MqttEndpoint endpoint) {
protected final void close(final MqttEndpoint endpoint, final Device authenticatedDevice) {
onClose(endpoint);
sendDisconnectedEvent(endpoint.clientIdentifier(), authenticatedDevice);
if (endpoint.isConnected()) {
LOG.debug("closing connection with client [client ID: {}]", endpoint.clientIdentifier());
endpoint.close();
Original file line number Diff line number Diff line change
@@ -18,6 +18,8 @@
import org.eclipse.hono.config.ClientConfigProperties;
import org.eclipse.hono.config.ProtocolAdapterProperties;
import org.eclipse.hono.service.AbstractAdapterConfig;
import org.eclipse.hono.service.monitoring.ConnectionEventProducer;
import org.eclipse.hono.service.monitoring.LoggingConnectionEventProducer;
import org.springframework.beans.factory.config.ObjectFactoryCreatingFactoryBean;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
@@ -40,7 +42,7 @@ public class Config extends AbstractAdapterConfig {
*/
@Bean(name = BEAN_NAME_VERTX_BASED_MQTT_PROTOCOL_ADAPTER)
@Scope("prototype")
public VertxBasedMqttProtocolAdapter vertxBasedMqttProtocolAdapter(){
public VertxBasedMqttProtocolAdapter vertxBasedMqttProtocolAdapter() {
return new VertxBasedMqttProtocolAdapter();
}

@@ -72,7 +74,7 @@ protected void customizeCredentialsServiceClientConfig(final RequestResponseClie
*/
@Bean
@ConfigurationProperties(prefix = "hono.app")
public ApplicationConfigProperties applicationConfigProperties(){
public ApplicationConfigProperties applicationConfigProperties() {
return new ApplicationConfigProperties();
}

@@ -98,4 +100,16 @@ public ObjectFactoryCreatingFactoryBean serviceFactory() {
factory.setTargetBeanName(BEAN_NAME_VERTX_BASED_MQTT_PROTOCOL_ADAPTER);
return factory;
}

/**
* Exposes the connection event producer implementation.
* <p>
* This defaults to a {@link LoggingConnectionEventProducer} which logs to the default level.
*
* @return The connection event producer.
*/
@Bean
public ConnectionEventProducer connectionEventProducer() {
return new LoggingConnectionEventProducer();
}
}
Original file line number Diff line number Diff line change
@@ -28,6 +28,7 @@
import org.eclipse.hono.client.TenantClient;
import org.eclipse.hono.config.ProtocolAdapterProperties;
import org.eclipse.hono.service.auth.device.Device;
import org.eclipse.hono.service.monitoring.ConnectionEventProducer;
import org.eclipse.hono.util.Constants;
import org.eclipse.hono.util.CredentialsConstants;
import org.eclipse.hono.util.MessageHelper;
@@ -66,6 +67,8 @@ public abstract class AbstractProtocolAdapterBase<T extends ProtocolAdapterPrope
private HonoClient tenantClient;
private HonoClient credentialsServiceClient;

private ConnectionEventProducer connectionEventProducer;

/**
* Sets the configuration by means of Spring dependency injection.
* <p>
@@ -171,6 +174,19 @@ public final HonoClient getCredentialsServiceClient() {
return credentialsServiceClient;
}

public void setConnectionEventProducer(final ConnectionEventProducer connectionEventProducer) {
this.connectionEventProducer = connectionEventProducer;
}

/**
* Gets the producer of connection events.
*
* @return The implementation for producing connection events. Maybe {@code null}.
*/
public ConnectionEventProducer getConnectionEventProducer() {
return this.connectionEventProducer;
}

/**
* Gets this adapter's type name.
* <p>
@@ -636,4 +652,16 @@ protected final Message newMessage(
addProperties(msg, registrationInfo);
return msg;
}

protected void sendConnectedEvent(final String connectionIdentifier, final Device authenticatedDevice) {
if (this.connectionEventProducer != null) {
this.connectionEventProducer.connected(connectionIdentifier, authenticatedDevice);
}
}

protected void sendDisconnectedEvent(final String connectionIdentifier, final Device authenticatedDevice) {
if (this.connectionEventProducer != null) {
this.connectionEventProducer.disconnected(connectionIdentifier, authenticatedDevice);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Copyright (c) 2018 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 1.0 which is available at
* https://www.eclipse.org/legal/epl-v10.html
*
* SPDX-License-Identifier: EPL-1.0
*/
package org.eclipse.hono.service.monitoring;

import org.eclipse.hono.service.auth.device.Device;

/**
* Produces connection events.
* <p>
* The interface is intended to be implemented by setups which are interested in receiving technical connection events.
* Which might not make sense for all protocols adapters. But mostly for connection oriented ones like MQTT or AMQP 1.0.
* <p>
* The protocol adapters may call {@link #connected(String, Device)} and {@link #disconnected(String, Device)} as they
* see fit. The whole process is a "best effort" process and intended to be used for monitoring/debugging.
* <p>
* When a protocol adapter calls into this producer it must provide some kind of connection ID, which might have a
* different meaning for different protocol adapters. This should be as identifiable as possible but there is no
* requirement for this to be unique. However for each connection the protocol adapter must call
* {@link #disconnected(String, Device)} with the same information as it called {@link #connected(String, Device)}.
*/
public interface ConnectionEventProducer {

/**
* Produce an event for a new connection.
*
* @param connectionIdentifier The ID of the connection which connected.
* @param authenticatedDevice The optional authenticated device associated with the connection. May be {@code null}.
*/
public void connected(String connectionIdentifier, Device authenticatedDevice);

/**
* Produce an event for a closed connection.
*
* @param connectionIdentifier The ID of the connection which disconnected.
* @param authenticatedDevice The optional authenticated device associated with the connection. May be {@code null}.
*/
public void disconnected(String connectionIdentifier, Device authenticatedDevice);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/**
* Copyright (c) 2018 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 1.0 which is available at
* https://www.eclipse.org/legal/epl-v10.html
*
* SPDX-License-Identifier: EPL-1.0
*/
package org.eclipse.hono.service.monitoring;

import static java.lang.invoke.MethodType.methodType;

import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;

import org.eclipse.hono.service.auth.device.Device;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

/**
* A logging only implementation.
*/
public class LoggingConnectionEventProducer implements ConnectionEventProducer {

private static final Logger logger = LoggerFactory.getLogger(LoggingConnectionEventProducer.class);

private final MethodHandle handle;

/**
* Create a new instance with the default log level ({@link Level#INFO}).
*/
public LoggingConnectionEventProducer() {
this(null);
}

/**
* Create a new instance with the provided log level.
*
* @param level The provided log level to use, may be {@code null} in which case {@link Level#INFO} will be used.
*/
public LoggingConnectionEventProducer(Level level) {
if (level == null) {
level = Level.INFO;
}
this.handle = loggerMethod(level.name().toLowerCase()).bindTo(logger);
}

private MethodHandle loggerMethod(final String methodName) {
try {
return MethodHandles.lookup()
.findVirtual(Logger.class,
methodName,
methodType(void.class, String.class, Object.class, Object.class));
} catch (final NoSuchMethodException | IllegalAccessException e) {
throw new RuntimeException("Failed to set up logger", e);
}
}

private void log(final String message, final Object connectionIdentifier, final Object authenticatedDevice) {
try {
logInternal(message, connectionIdentifier, authenticatedDevice);
} catch (final Throwable e) {
// we don't want a log message to fail anything
}
}

void logInternal(final String message, final Object connectionIdentifier, final Object authenticatedDevice)
throws Throwable {
this.handle.invokeExact(message, connectionIdentifier, authenticatedDevice);
}

@Override
public void connected(final String connectionIdentifier, final Device authenticatedDevice) {
log(" Connected - ID: {}, Device: {}", connectionIdentifier, authenticatedDevice);
}

@Override
public void disconnected(final String connectionIdentifier, final Device authenticatedDevice) {
log("Disconnected - ID: {}, Device: {}", connectionIdentifier, authenticatedDevice);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Copyright (c) 2018 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 1.0 which is available at
* https://www.eclipse.org/legal/epl-v10.html
*
* SPDX-License-Identifier: EPL-1.0
*/

package org.eclipse.hono.service.monitoring;

import org.eclipse.hono.service.auth.device.Device;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.event.Level;

/**
* Test if all levels can be used for logging.
*/
@RunWith(Parameterized.class)
public class LoggingConnectionEventProducerLevelsTest {

@Parameters
public static Level[] levels() {
return Level.values();
}

private Level level;

public LoggingConnectionEventProducerLevelsTest(Level level) {
this.level = level;
}

@Test
public void testLevel() throws Throwable {
final LoggingConnectionEventProducer l = new LoggingConnectionEventProducer(this.level);
l.logInternal("Foo", "bar", new Device("DEFAULT_TENANT", "device1"));
}
}

0 comments on commit 03c8982

Please sign in to comment.