Skip to content

Commit

Permalink
[eclipse-hono#565] Implement protocol connection events
Browse files Browse the repository at this point in the history
  • Loading branch information
ctron committed Apr 11, 2018
1 parent e5e3fe8 commit e44ebcd
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@

package org.eclipse.hono.adapter.mqtt;

import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE;

import java.net.HttpURLConnection;
import java.util.Objects;

Expand Down Expand Up @@ -40,11 +44,13 @@

import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.mqtt.MqttAuth;
import io.vertx.mqtt.MqttConnectionException;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttServer;
import io.vertx.mqtt.MqttServerOptions;
Expand Down Expand Up @@ -245,6 +251,35 @@ public void doStop(final Future<Void> stopFuture) {
.compose(d -> stopFuture.complete(), stopFuture);
}

/**
* Create a future indicating a rejected connection.
*
* @param returnCode The error code to return.
* @return A future indicating a rejected connection.
*/
protected static <T> Future<T> rejected ( final MqttConnectReturnCode returnCode ) {
return Future.failedFuture(new MqttConnectionException(returnCode != null ? returnCode : MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE ));
}

/**
* Create a future indicating an accepted connection with an authenticated device.
*
* @param authenticatedDevice The device that was authenticated, may be {@code null}
* @return A future indicating an accepted connection.
*/
protected static Future<Device> accepted ( final Device authenticatedDevice ) {
return Future.succeededFuture(authenticatedDevice);
}

/**
* Create a future indicating an accepted connection without an authenticated device.
*
* @return A future indicating an accepted connection.
*/
protected static Future<Device> accepted () {
return Future.succeededFuture(null);
}

/**
* Invoked when a client sends its <em>CONNECT</em> packet.
* <p>
Expand All @@ -256,19 +291,42 @@ final void handleEndpointConnection(final MqttEndpoint endpoint) {

LOG.debug("connection request from client [clientId: {}]", endpoint.clientIdentifier());

isConnected().map(ok -> {
if (getConfig().isAuthenticationRequired()) {
handleEndpointConnectionWithAuthentication(endpoint);
isConnected()
.compose( v -> handleConnectionRequest(endpoint))
.setHandler(result -> handleConnectionRequestResult ( endpoint, result ));

}

private Future<Device> handleConnectionRequest(final MqttEndpoint endpoint) {
if (getConfig().isAuthenticationRequired()) {
return handleEndpointConnectionWithAuthentication(endpoint);
} else {
return handleEndpointConnectionWithoutAuthentication(endpoint);
}
}

private void handleConnectionRequestResult ( final MqttEndpoint endpoint, final AsyncResult<Device> done) {

if ( done.succeeded() ) {

endpoint.accept(false);
sendConnectedEvent(endpoint.clientIdentifier(), done.result());

} else {

final Throwable t = done.cause();
if ( t instanceof MqttConnectionException ) {
final MqttConnectReturnCode code = ((MqttConnectionException) t).code();
LOG.debug("connection request from client [clientId: {}] rejected with code: {}",
endpoint.clientIdentifier(), code);
endpoint.reject(code);
} else {
handleEndpointConnectionWithoutAuthentication(endpoint);
LOG.debug("connection request from client [clientId: {}] rejected: {}",
endpoint.clientIdentifier(), MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
}
return null;
}).otherwise(t -> {
LOG.debug("connection request from client [clientId: {}] rejected: {}",
endpoint.clientIdentifier(), MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
return null;
});

}
}

/**
Expand All @@ -282,39 +340,42 @@ final void handleEndpointConnection(final MqttEndpoint endpoint) {
*
* @param endpoint The MQTT endpoint representing the client.
*/
private void handleEndpointConnectionWithoutAuthentication(final MqttEndpoint endpoint) {
private Future<Device> handleEndpointConnectionWithoutAuthentication(final MqttEndpoint endpoint) {

endpoint.closeHandler(v -> {
close(endpoint);
close(endpoint, null);
LOG.debug("connection to unauthenticated device [clientId: {}] closed", endpoint.clientIdentifier());
metrics.decrementUnauthenticatedMqttConnections();
});
endpoint.publishHandler(message -> onPublishedMessage(new MqttContext(message, endpoint)));

LOG.debug("unauthenticated device [clientId: {}] connected", endpoint.clientIdentifier());
endpoint.accept(false);
metrics.incrementUnauthenticatedMqttConnections();
return accepted();
}

private void handleEndpointConnectionWithAuthentication(final MqttEndpoint endpoint) {
private Future<Device> handleEndpointConnectionWithAuthentication(final MqttEndpoint endpoint) {

if (endpoint.auth() == null) {

LOG.debug("connection request from device [clientId: {}] rejected: {}",
endpoint.clientIdentifier(), "device did not provide credentials in CONNECT packet");
endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);

return rejected(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);

} else {

final DeviceCredentials credentials = getCredentials(endpoint.auth());

if (credentials == null) {

LOG.debug("connection request from device [clientId: {}] rejected: {}",
endpoint.clientIdentifier(), "device provided malformed credentials in CONNECT packet");
endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
return rejected(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);

} else {

getTenantConfiguration(credentials.getTenantId()).compose(tenantConfig -> {
return getTenantConfiguration(credentials.getTenantId()).compose(tenantConfig -> {
if (tenantConfig.isAdapterEnabled(getTypeName())) {
LOG.debug("protocol adapter [{}] is enabled for tenant [{}]",
getTypeName(), credentials.getTenantId());
Expand All @@ -329,39 +390,39 @@ private void handleEndpointConnectionWithAuthentication(final MqttEndpoint endpo
final Future<Device> result = Future.future();
usernamePasswordAuthProvider.authenticate(credentials, result.completer());
return result;
}).map(authenticatedDevice -> {
}).compose(authenticatedDevice -> {
LOG.debug("successfully authenticated device [tenant-id: {}, auth-id: {}, device-id: {}]",
authenticatedDevice.getTenantId(), credentials.getAuthId(),
authenticatedDevice.getDeviceId());
onAuthenticationSuccess(endpoint, authenticatedDevice);
return null;
}).otherwise(t -> {
return accepted ( authenticatedDevice );
}).recover(t -> {
LOG.debug("cannot authenticate device [tenant-id: {}, auth-id: {}]",
credentials.getTenantId(), credentials.getAuthId(), t);
if (ServerErrorException.class.isInstance(t)) {
if (t instanceof ServerErrorException) {
// one of the services we depend on might not be available (yet)
endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
return rejected(CONNECTION_REFUSED_SERVER_UNAVAILABLE);
} else {
// validation of credentials has failed
endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
return rejected(CONNECTION_REFUSED_NOT_AUTHORIZED);
}
return null;
});

}
}
}

private void onAuthenticationSuccess(final MqttEndpoint endpoint, final Device authenticatedDevice) {

endpoint.closeHandler(v -> {
close(endpoint);
close(endpoint, authenticatedDevice);
LOG.debug("connection to device [tenant-id: {}, device-id: {}] closed",
authenticatedDevice.getTenantId(), authenticatedDevice.getDeviceId());
metrics.decrementMqttConnections(authenticatedDevice.getTenantId());
sendDisconnectedEvent(endpoint.clientIdentifier(), authenticatedDevice);
});

endpoint.publishHandler(message -> onPublishedMessage(new MqttContext(message, endpoint, authenticatedDevice)));
endpoint.accept(false);
metrics.incrementMqttConnections(authenticatedDevice.getTenantId());
}

Expand Down Expand Up @@ -527,9 +588,11 @@ private Future<Void> uploadMessage(final MqttContext ctx, final String tenant, f
* Closes a connection to a client.
*
* @param endpoint The connection to close.
* @param authenticatedDevice Optional authenticated device information, may be {@code null}.
*/
protected final void close(final MqttEndpoint endpoint) {
protected final void close(final MqttEndpoint endpoint, final Device authenticatedDevice) {
onClose(endpoint);
sendDisconnectedEvent(endpoint.clientIdentifier(), authenticatedDevice);
if (endpoint.isConnected()) {
LOG.debug("closing connection with client [client ID: {}]", endpoint.clientIdentifier());
endpoint.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ public abstract class AbstractProtocolAdapterBase<T extends ProtocolAdapterPrope
private HonoClient tenantClient;
private HonoClient credentialsServiceClient;

private ConnectionEventProducer connectionEventProducer;

/**
* Sets the configuration by means of Spring dependency injection.
* <p>
Expand Down Expand Up @@ -171,6 +173,19 @@ public final HonoClient getCredentialsServiceClient() {
return credentialsServiceClient;
}

public void setConnectionEventProducer(final ConnectionEventProducer connectionEventProducer) {
this.connectionEventProducer = connectionEventProducer;
}

/**
* Gets the producer of connection events.
*
* @return The implementation for producing connection events. Maybe {@code null}.
*/
public ConnectionEventProducer getConnectionEventProducer() {
return this.connectionEventProducer;
}

/**
* Gets this adapter's type name.
* <p>
Expand Down Expand Up @@ -636,4 +651,16 @@ protected final Message newMessage(
addProperties(msg, registrationInfo);
return msg;
}

protected void sendConnectedEvent(final String connectionIdentifier, final Device authenticatedDevice) {
if (this.connectionEventProducer != null) {
this.connectionEventProducer.connected(connectionIdentifier, authenticatedDevice);
}
}

protected void sendDisconnectedEvent(final String connectionIdentifier, final Device authenticatedDevice) {
if (this.connectionEventProducer != null) {
this.connectionEventProducer.disconnected(connectionIdentifier, authenticatedDevice);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Copyright (c) 2018 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 1.0 which is available at
* https://www.eclipse.org/legal/epl-v10.html
*
* SPDX-License-Identifier: EPL-1.0
*/
package org.eclipse.hono.service;

import org.eclipse.hono.service.auth.device.Device;

/**
* Produces connection events.
* <p>
* The interface is intended to be implemented by setups which are interested in receiving technical connection events.
* Which might not make sense for all protocols adapters. But mostly for connection oriented ones like MQTT or AMQP 1.0.
* <p>
* The protocol adapters may call {@link #connected(String, Device)} and {@link #disconnected(String, Device)} as they
* see fit. The whole process is a "best effort" process and intended to be used for monitoring/debugging.
* <p>
* When a protocol adapter calls into this producer it must provide some kind of connection ID, which might have a
* different meaning for different protocol adapters. This should be as identifiable as possible but there is no
* requirement for this to be unique. However for each connection the protocol adapter must call
* {@link #disconnected(String, Device)} with the same information as it called {@link #connected(String, Device)}.
*/
public interface ConnectionEventProducer {

/**
* Produce an event for a new connection.
*
* @param connectionIdentifier The ID of the connection which connected.
* @param authenticatedDevice The optional authenticated device associated with the connection. May be {@code null}.
*/
public void connected(String connectionIdentifier, Device authenticatedDevice);

/**
* Produce an event for a closed connection.
*
* @param connectionIdentifier The ID of the connection which disconnected.
* @param authenticatedDevice The optional authenticated device associated with the connection. May be {@code null}.
*/
public void disconnected(String connectionIdentifier, Device authenticatedDevice);

}

0 comments on commit e44ebcd

Please sign in to comment.