Skip to content

Commit

Permalink
[eclipse-hono#565] Implement protocol connection events
Browse files Browse the repository at this point in the history
  • Loading branch information
ctron committed Apr 16, 2018
1 parent 3fce085 commit 6b6c644
Show file tree
Hide file tree
Showing 6 changed files with 355 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@

import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.mqtt.MqttAuth;
import io.vertx.mqtt.MqttConnectionException;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttServer;
import io.vertx.mqtt.MqttServerOptions;
Expand Down Expand Up @@ -245,6 +247,35 @@ public void doStop(final Future<Void> stopFuture) {
.compose(d -> stopFuture.complete(), stopFuture);
}

/**
* Create a future indicating a rejected connection.
*
* @param returnCode The error code to return.
* @return A future indicating a rejected connection.
*/
protected static <T> Future<T> rejected ( final MqttConnectReturnCode returnCode ) {
return Future.failedFuture(new MqttConnectionException(returnCode != null ? returnCode : MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE ));
}

/**
* Create a future indicating an accepted connection with an authenticated device.
*
* @param authenticatedDevice The device that was authenticated, may be {@code null}
* @return A future indicating an accepted connection.
*/
protected static Future<Device> accepted ( final Device authenticatedDevice ) {
return Future.succeededFuture(authenticatedDevice);
}

/**
* Create a future indicating an accepted connection without an authenticated device.
*
* @return A future indicating an accepted connection.
*/
protected static Future<Device> accepted () {
return Future.succeededFuture(null);
}

/**
* Invoked when a client sends its <em>CONNECT</em> packet.
* <p>
Expand All @@ -256,19 +287,42 @@ final void handleEndpointConnection(final MqttEndpoint endpoint) {

LOG.debug("connection request from client [clientId: {}]", endpoint.clientIdentifier());

isConnected().map(ok -> {
if (getConfig().isAuthenticationRequired()) {
handleEndpointConnectionWithAuthentication(endpoint);
isConnected()
.compose( v -> handleConnectionRequest(endpoint))
.setHandler(result -> handleConnectionRequestResult ( endpoint, result ));

}

private Future<Device> handleConnectionRequest(final MqttEndpoint endpoint) {
if (getConfig().isAuthenticationRequired()) {
return handleEndpointConnectionWithAuthentication(endpoint);
} else {
return handleEndpointConnectionWithoutAuthentication(endpoint);
}
}

private void handleConnectionRequestResult ( final MqttEndpoint endpoint, final AsyncResult<Device> done) {

if ( done.succeeded() ) {

endpoint.accept(false);
sendConnectedEvent(endpoint.clientIdentifier(), done.result());

} else {

final Throwable t = done.cause();
if ( t instanceof MqttConnectionException ) {
final MqttConnectReturnCode code = ((MqttConnectionException) t).code();
LOG.debug("connection request from client [clientId: {}] rejected with code: {}",
endpoint.clientIdentifier(), code);
endpoint.reject(code);
} else {
handleEndpointConnectionWithoutAuthentication(endpoint);
LOG.debug("connection request from client [clientId: {}] rejected: {}",
endpoint.clientIdentifier(), MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
}
return null;
}).otherwise(t -> {
LOG.debug("connection request from client [clientId: {}] rejected: {}",
endpoint.clientIdentifier(), MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
return null;
});

}
}

/**
Expand All @@ -282,39 +336,42 @@ final void handleEndpointConnection(final MqttEndpoint endpoint) {
*
* @param endpoint The MQTT endpoint representing the client.
*/
private void handleEndpointConnectionWithoutAuthentication(final MqttEndpoint endpoint) {
private Future<Device> handleEndpointConnectionWithoutAuthentication(final MqttEndpoint endpoint) {

endpoint.closeHandler(v -> {
close(endpoint);
close(endpoint, null);
LOG.debug("connection to unauthenticated device [clientId: {}] closed", endpoint.clientIdentifier());
metrics.decrementUnauthenticatedMqttConnections();
});
endpoint.publishHandler(message -> onPublishedMessage(new MqttContext(message, endpoint)));

LOG.debug("unauthenticated device [clientId: {}] connected", endpoint.clientIdentifier());
endpoint.accept(false);
metrics.incrementUnauthenticatedMqttConnections();
return accepted();
}

private void handleEndpointConnectionWithAuthentication(final MqttEndpoint endpoint) {
private Future<Device> handleEndpointConnectionWithAuthentication(final MqttEndpoint endpoint) {

if (endpoint.auth() == null) {

LOG.debug("connection request from device [clientId: {}] rejected: {}",
endpoint.clientIdentifier(), "device did not provide credentials in CONNECT packet");
endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);

return rejected(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);

} else {

final DeviceCredentials credentials = getCredentials(endpoint.auth());

if (credentials == null) {

LOG.debug("connection request from device [clientId: {}] rejected: {}",
endpoint.clientIdentifier(), "device provided malformed credentials in CONNECT packet");
endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
return rejected(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);

} else {

getTenantConfiguration(credentials.getTenantId()).compose(tenantConfig -> {
return getTenantConfiguration(credentials.getTenantId()).compose(tenantConfig -> {
if (tenantConfig.isAdapterEnabled(getTypeName())) {
LOG.debug("protocol adapter [{}] is enabled for tenant [{}]",
getTypeName(), credentials.getTenantId());
Expand All @@ -329,39 +386,38 @@ private void handleEndpointConnectionWithAuthentication(final MqttEndpoint endpo
final Future<Device> result = Future.future();
usernamePasswordAuthProvider.authenticate(credentials, result.completer());
return result;
}).map(authenticatedDevice -> {
}).compose(authenticatedDevice -> {
LOG.debug("successfully authenticated device [tenant-id: {}, auth-id: {}, device-id: {}]",
authenticatedDevice.getTenantId(), credentials.getAuthId(),
authenticatedDevice.getDeviceId());
onAuthenticationSuccess(endpoint, authenticatedDevice);
return null;
}).otherwise(t -> {
return accepted ( authenticatedDevice );
}).recover(t -> {
LOG.debug("cannot authenticate device [tenant-id: {}, auth-id: {}]",
credentials.getTenantId(), credentials.getAuthId(), t);
if (ServerErrorException.class.isInstance(t)) {
if (t instanceof ServerErrorException) {
// one of the services we depend on might not be available (yet)
endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
return rejected(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
} else {
// validation of credentials has failed
endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
return rejected(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
}
return null;
});

}
}
}

private void onAuthenticationSuccess(final MqttEndpoint endpoint, final Device authenticatedDevice) {

endpoint.closeHandler(v -> {
close(endpoint);
close(endpoint, authenticatedDevice);
LOG.debug("connection to device [tenant-id: {}, device-id: {}] closed",
authenticatedDevice.getTenantId(), authenticatedDevice.getDeviceId());
metrics.decrementMqttConnections(authenticatedDevice.getTenantId());
});

endpoint.publishHandler(message -> onPublishedMessage(new MqttContext(message, endpoint, authenticatedDevice)));
endpoint.accept(false);
metrics.incrementMqttConnections(authenticatedDevice.getTenantId());
}

Expand Down Expand Up @@ -527,9 +583,11 @@ private Future<Void> uploadMessage(final MqttContext ctx, final String tenant, f
* Closes a connection to a client.
*
* @param endpoint The connection to close.
* @param authenticatedDevice Optional authenticated device information, may be {@code null}.
*/
protected final void close(final MqttEndpoint endpoint) {
protected final void close(final MqttEndpoint endpoint, final Device authenticatedDevice) {
onClose(endpoint);
sendDisconnectedEvent(endpoint.clientIdentifier(), authenticatedDevice);
if (endpoint.isConnected()) {
LOG.debug("closing connection with client [client ID: {}]", endpoint.clientIdentifier());
endpoint.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import org.eclipse.hono.config.ClientConfigProperties;
import org.eclipse.hono.config.ProtocolAdapterProperties;
import org.eclipse.hono.service.AbstractAdapterConfig;
import org.eclipse.hono.service.monitoring.ConnectionEventProducer;
import org.eclipse.hono.service.monitoring.LoggingConnectionEventProducer;
import org.springframework.beans.factory.config.ObjectFactoryCreatingFactoryBean;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
Expand All @@ -40,7 +42,7 @@ public class Config extends AbstractAdapterConfig {
*/
@Bean(name = BEAN_NAME_VERTX_BASED_MQTT_PROTOCOL_ADAPTER)
@Scope("prototype")
public VertxBasedMqttProtocolAdapter vertxBasedMqttProtocolAdapter(){
public VertxBasedMqttProtocolAdapter vertxBasedMqttProtocolAdapter() {
return new VertxBasedMqttProtocolAdapter();
}

Expand Down Expand Up @@ -72,7 +74,7 @@ protected void customizeCredentialsServiceClientConfig(final RequestResponseClie
*/
@Bean
@ConfigurationProperties(prefix = "hono.app")
public ApplicationConfigProperties applicationConfigProperties(){
public ApplicationConfigProperties applicationConfigProperties() {
return new ApplicationConfigProperties();
}

Expand All @@ -98,4 +100,16 @@ public ObjectFactoryCreatingFactoryBean serviceFactory() {
factory.setTargetBeanName(BEAN_NAME_VERTX_BASED_MQTT_PROTOCOL_ADAPTER);
return factory;
}

/**
* Exposes the connection event producer implementation.
* <p>
* This defaults to a {@link LoggingConnectionEventProducer} which logs to the default level.
*
* @return The connection event producer.
*/
@Bean
public ConnectionEventProducer connectionEventProducer() {
return new LoggingConnectionEventProducer();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.util.Objects;
import java.util.Optional;

import io.vertx.core.Handler;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.message.Message;
Expand All @@ -28,18 +27,20 @@
import org.eclipse.hono.client.TenantClient;
import org.eclipse.hono.config.ProtocolAdapterProperties;
import org.eclipse.hono.service.auth.device.Device;
import org.eclipse.hono.service.monitoring.ConnectionEventProducer;
import org.eclipse.hono.util.Constants;
import org.eclipse.hono.util.CredentialsConstants;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.RegistrationConstants;
import org.eclipse.hono.util.Strings;
import org.eclipse.hono.util.TenantObject;
import org.eclipse.hono.util.TenantConstants;
import org.eclipse.hono.util.TenantObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;

import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.healthchecks.HealthCheckHandler;
Expand All @@ -66,6 +67,8 @@ public abstract class AbstractProtocolAdapterBase<T extends ProtocolAdapterPrope
private HonoClient tenantClient;
private HonoClient credentialsServiceClient;

private ConnectionEventProducer connectionEventProducer;

/**
* Sets the configuration by means of Spring dependency injection.
* <p>
Expand Down Expand Up @@ -171,6 +174,26 @@ public final HonoClient getCredentialsServiceClient() {
return credentialsServiceClient;
}

/**
* Sets the producer for connections events.
*
* @param connectionEventProducer The instance which will handle the production of connection events. Depending on
* the setup this could be a simple log message or an event using the Hono Event API.
*/
@Autowired
public void setConnectionEventProducer(final ConnectionEventProducer connectionEventProducer) {
this.connectionEventProducer = connectionEventProducer;
}

/**
* Gets the producer of connection events.
*
* @return The implementation for producing connection events. Maybe {@code null}.
*/
public ConnectionEventProducer getConnectionEventProducer() {
return this.connectionEventProducer;
}

/**
* Gets this adapter's type name.
* <p>
Expand Down Expand Up @@ -637,4 +660,34 @@ protected final Message newMessage(
addProperties(msg, registrationInfo);
return msg;
}

/**
* Trigger the creation of a "connected" event.
*
* @param remoteId The remote ID
* @param authenticatedDevice The (optional) authenticated device.
*
* @see ConnectionEventProducer
* @see ConnectionEventProducer#connected(String, Device)
*/
protected void sendConnectedEvent(final String remoteId, final Device authenticatedDevice) {
if (this.connectionEventProducer != null) {
this.connectionEventProducer.connected(remoteId, authenticatedDevice);
}
}

/**
* Trigger the creation of a "disconnected" event.
*
* @param remoteId The remote ID.
* @param authenticatedDevice The (optional) authenticated device.
*
* @see ConnectionEventProducer
* @see ConnectionEventProducer#connected(String, Device)
*/
protected void sendDisconnectedEvent(final String remoteId, final Device authenticatedDevice) {
if (this.connectionEventProducer != null) {
this.connectionEventProducer.disconnected(remoteId, authenticatedDevice);
}
}
}
Loading

0 comments on commit 6b6c644

Please sign in to comment.