Skip to content

Commit

Permalink
[eclipse-hono#1858] Use ttd value as command consumer lifespan.
Browse files Browse the repository at this point in the history
Also change the type of the lifespan parameter
from int to Duration in the methods of the
DeviceConnectionInfo, DeviceConnectionClient and
DeviceConnectionService interfaces.

Signed-off-by: Carsten Lohmann <[email protected]>
  • Loading branch information
calohmn committed Apr 23, 2020
1 parent 3dca60d commit 036decd
Show file tree
Hide file tree
Showing 30 changed files with 247 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -864,10 +864,10 @@ private Future<MessageConsumer> createCommandConsumer(
if (authenticatedDevice != null && !authenticatedDevice.getDeviceId().equals(sourceAddress.getResourceId())) {
// gateway scenario
return getCommandConsumerFactory().createCommandConsumer(sourceAddress.getTenantId(),
sourceAddress.getResourceId(), authenticatedDevice.getDeviceId(), commandHandler, span.context());
sourceAddress.getResourceId(), authenticatedDevice.getDeviceId(), commandHandler, null, span.context());
} else {
return getCommandConsumerFactory().createCommandConsumer(sourceAddress.getTenantId(),
sourceAddress.getResourceId(), commandHandler, span.context());
sourceAddress.getResourceId(), commandHandler, null, span.context());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ public void testAdapterOpensSenderLinkAndNotifyDownstreamApplication() {
// WHEN an unauthenticated device opens a receiver link with a valid source address
final ProtonConnection deviceConnection = mock(ProtonConnection.class);
when(deviceConnection.attachments()).thenReturn(mock(Record.class));
when(commandConsumerFactory.createCommandConsumer(eq(TEST_TENANT_ID), eq(TEST_DEVICE), any(Handler.class), any()))
when(commandConsumerFactory.createCommandConsumer(eq(TEST_TENANT_ID), eq(TEST_DEVICE), any(Handler.class), any(), any()))
.thenReturn(Future.succeededFuture(mock(MessageConsumer.class)));
final String sourceAddress = String.format("%s/%s/%s", getCommandEndpoint(), TEST_TENANT_ID, TEST_DEVICE);
final ProtonSender sender = getSender(sourceAddress);
Expand Down Expand Up @@ -488,7 +488,7 @@ public void testAdapterClosesCommandConsumerWhenDeviceClosesReceiverLink() {

// and a device that wants to receive commands
final MessageConsumer commandConsumer = mock(MessageConsumer.class);
when(commandConsumerFactory.createCommandConsumer(eq(TEST_TENANT_ID), eq(TEST_DEVICE), any(Handler.class), any()))
when(commandConsumerFactory.createCommandConsumer(eq(TEST_TENANT_ID), eq(TEST_DEVICE), any(Handler.class), any(), any()))
.thenReturn(Future.succeededFuture(commandConsumer));
final String sourceAddress = String.format("%s", getCommandEndpoint());
final ProtonSender sender = getSender(sourceAddress);
Expand Down Expand Up @@ -588,7 +588,7 @@ private void testAdapterClosesCommandConsumer(

// that wants to receive commands
final MessageConsumer commandConsumer = mock(MessageConsumer.class);
when(commandConsumerFactory.createCommandConsumer(eq(TEST_TENANT_ID), eq(TEST_DEVICE), any(Handler.class), any()))
when(commandConsumerFactory.createCommandConsumer(eq(TEST_TENANT_ID), eq(TEST_DEVICE), any(Handler.class), any(), any()))
.thenReturn(Future.succeededFuture(commandConsumer));
final String sourceAddress = getCommandEndpoint();
final ProtonSender sender = getSender(sourceAddress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.security.Principal;
import java.security.PrivateKey;
import java.security.cert.Certificate;
import java.time.Duration;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -983,12 +984,14 @@ protected final Future<MessageConsumer> createCommandConsumer(
deviceId,
gatewayId,
commandHandler,
Duration.ofSeconds(ttdSecs),
currentSpan.context());
} else {
commandConsumerFuture = getCommandConsumerFactory().createCommandConsumer(
tenantObject.getTenantId(),
deviceId,
commandHandler,
Duration.ofSeconds(ttdSecs),
currentSpan.context());
}
return commandConsumerFuture
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ public void setup() {
commandConsumer = mock(MessageConsumer.class);
commandConsumerFactory = mock(ProtocolAdapterCommandConsumerFactory.class);
when(commandConsumerFactory.connect()).thenReturn(Future.succeededFuture(mock(HonoConnection.class)));
when(commandConsumerFactory.createCommandConsumer(anyString(), anyString(), any(Handler.class), any()))
when(commandConsumerFactory.createCommandConsumer(anyString(), anyString(), any(Handler.class), any(), any()))
.thenReturn(Future.succeededFuture(commandConsumer));
when(commandConsumerFactory.createCommandConsumer(anyString(), anyString(), anyString(), any(Handler.class), any()))
when(commandConsumerFactory.createCommandConsumer(anyString(), anyString(), anyString(), any(Handler.class), any(), any()))
.thenReturn(Future.succeededFuture(commandConsumer));

deviceConnectionClientFactory = mock(DeviceConnectionClientFactory.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package org.eclipse.hono.adapter.http;

import java.net.HttpURLConnection;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -1057,12 +1058,14 @@ protected final Future<MessageConsumer> createCommandConsumer(
deviceId,
gatewayId,
commandHandler,
Duration.ofSeconds(ttdSecs),
currentSpan.context());
} else {
commandConsumerFuture = getCommandConsumerFactory().createCommandConsumer(
tenantObject.getTenantId(),
deviceId,
commandHandler,
Duration.ofSeconds(ttdSecs),
currentSpan.context());
}
return commandConsumerFuture
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public void setup() {
}).when(commandConsumer).close(any(Handler.class));
commandConsumerFactory = mock(ProtocolAdapterCommandConsumerFactory.class);
when(commandConsumerFactory.connect()).thenReturn(Future.succeededFuture(mock(HonoConnection.class)));
when(commandConsumerFactory.createCommandConsumer(anyString(), anyString(), any(Handler.class), any()))
when(commandConsumerFactory.createCommandConsumer(anyString(), anyString(), any(Handler.class), any(), any()))
.thenReturn(Future.succeededFuture(commandConsumer));

commandTargetMapper = mock(CommandTargetMapper.class);
Expand Down Expand Up @@ -284,7 +284,7 @@ public void testUploadTelemetryFailsForDisabledTenant() {
// THEN the device gets a 403
assertContextFailedWithClientError(ctx, HttpURLConnection.HTTP_FORBIDDEN);
// and no Command consumer has been created for the device
verify(commandConsumerFactory, never()).createCommandConsumer(anyString(), anyString(), any(Handler.class), any());
verify(commandConsumerFactory, never()).createCommandConsumer(anyString(), anyString(), any(Handler.class), any(), any());
// and the message has not been forwarded downstream
verify(sender, never()).send(any(Message.class));
// and has not been reported as processed
Expand Down Expand Up @@ -337,7 +337,7 @@ public void testUploadTelemetryFailsForUnknownDevice() {
// and the message has not been forwarded downstream
verify(sender, never()).send(any(Message.class), any(SpanContext.class));
// and no Command consumer has been created for the device
verify(commandConsumerFactory, never()).createCommandConsumer(anyString(), anyString(), any(Handler.class), any());
verify(commandConsumerFactory, never()).createCommandConsumer(anyString(), anyString(), any(Handler.class), any(), any());
// and has not been reported as processed
verify(metrics, never())
.reportTelemetry(
Expand Down Expand Up @@ -742,7 +742,7 @@ public void testUploadTelemetryWithTtdClosesCommandConsumerIfSenderCreationFaile
new ServerErrorException(HttpURLConnection.HTTP_UNAVAILABLE, "not connected")));
// and the creation of the command consumer completes at a later point
final Promise<MessageConsumer> commandConsumerPromise = Promise.promise();
when(commandConsumerFactory.createCommandConsumer(anyString(), anyString(), any(Handler.class), any()))
when(commandConsumerFactory.createCommandConsumer(anyString(), anyString(), any(Handler.class), any(), any()))
.thenReturn(commandConsumerPromise.future());

adapter.uploadTelemetryMessage(ctx, "tenant", "device");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ public void setUp(final TestInfo testInfo) {
}
return null;
}).when(commandConsumer).close(any(Handler.class));
when(commandConsumerFactory.createCommandConsumer(anyString(), anyString(), any(Handler.class), any())).
when(commandConsumerFactory.createCommandConsumer(anyString(), anyString(), any(Handler.class), any(), any())).
thenReturn(Future.succeededFuture(commandConsumer));

telemetrySender = mock(DownstreamSender.class);
Expand Down Expand Up @@ -490,8 +490,8 @@ public void testPostTelemetryWithTtdSucceedsWithCommandInResponse(final VertxTes
final Command pendingCommand = Command.from(msg, "DEFAULT_TENANT", "device_1");
final CommandContext commandContext = CommandContext.from(pendingCommand, mock(ProtonDelivery.class), mock(Span.class));
final MessageConsumer commandConsumer = mock(MessageConsumer.class);
when(commandConsumerFactory.createCommandConsumer(eq("DEFAULT_TENANT"), eq("device_1"), any(Handler.class), any())).
thenAnswer(invocation -> {
when(commandConsumerFactory.createCommandConsumer(eq("DEFAULT_TENANT"), eq("device_1"), any(Handler.class), any(), any()))
.thenAnswer(invocation -> {
final Handler<CommandContext> consumer = invocation.getArgument(2);
consumer.handle(commandContext);
return Future.succeededFuture(commandConsumer);
Expand All @@ -518,7 +518,7 @@ public void testPostTelemetryWithTtdSucceedsWithCommandInResponse(final VertxTes
.sendJsonObject(new JsonObject(), ctx.succeeding(r -> {
ctx.verify(() -> {
verify(commandConsumerFactory).createCommandConsumer(eq("DEFAULT_TENANT"), eq("device_1"),
any(Handler.class), any());
any(Handler.class), any(), any());
// and the command consumer has been closed again
verify(commandConsumer).close(any());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ private Future<MessageConsumer> startLoraCommandConsumer(final String tenantId)
tenantId,
LORA_COMMAND_CONSUMER_DEVICE_ID,
receivedCommandContext -> commandConsumer(tenantId, receivedCommandContext),
null,
null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -774,10 +774,10 @@ private Future<MessageConsumer> createCommandConsumer(final MqttEndpoint mqttEnd
if (sub.isGatewaySubscriptionForSpecificDevice()) {
// gateway scenario
return getCommandConsumerFactory().createCommandConsumer(sub.getTenant(), sub.getDeviceId(),
sub.getAuthenticatedDeviceId(), commandHandler, span.context());
sub.getAuthenticatedDeviceId(), commandHandler, null, span.context());
} else {
return getCommandConsumerFactory().createCommandConsumer(sub.getTenant(), sub.getDeviceId(), commandHandler,
span.context());
null, span.context());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -915,7 +915,7 @@ private void testOnSubscribeRegistersAndClosesConnection(final MqttQoS qos) {

// WHEN a device subscribes to commands
final MessageConsumer commandConsumer = mock(MessageConsumer.class);
when(commandConsumerFactory.createCommandConsumer(eq("tenant"), eq("deviceId"), any(Handler.class), any()))
when(commandConsumerFactory.createCommandConsumer(eq("tenant"), eq("deviceId"), any(Handler.class), any(), any()))
.thenReturn(Future.succeededFuture(commandConsumer));
final List<MqttTopicSubscription> subscriptions = Collections.singletonList(
newMockTopicSubscription(getCommandSubscriptionTopic("tenant", "deviceId"), qos));
Expand All @@ -929,7 +929,7 @@ private void testOnSubscribeRegistersAndClosesConnection(final MqttQoS qos) {
adapter.onSubscribe(endpoint, null, msg, cmdHandler, OptionalInt.empty());

// THEN the adapter creates a command consumer that is checked periodically
verify(commandConsumerFactory).createCommandConsumer(eq("tenant"), eq("deviceId"), any(Handler.class), any());
verify(commandConsumerFactory).createCommandConsumer(eq("tenant"), eq("deviceId"), any(Handler.class), any(), any());
// and the adapter registers a hook on the connection to the device
final ArgumentCaptor<Handler<Void>> closeHookCaptor = ArgumentCaptor.forClass(Handler.class);
verify(endpoint).closeHandler(closeHookCaptor.capture());
Expand Down Expand Up @@ -965,7 +965,7 @@ public void testOnSubscribeIncludesStatusCodeForEachFilter() {
subscriptions.add(newMockTopicSubscription("bumlux/+/+/#", MqttQoS.AT_MOST_ONCE));
subscriptions.add(newMockTopicSubscription("bumlux/+/+/#", MqttQoS.AT_MOST_ONCE));
// and for subscribing to commands
when(commandConsumerFactory.createCommandConsumer(eq("tenant-1"), eq("device-A"), any(Handler.class), any()))
when(commandConsumerFactory.createCommandConsumer(eq("tenant-1"), eq("device-A"), any(Handler.class), any(), any()))
.thenReturn(Future.succeededFuture(mock(MessageConsumer.class)));
subscriptions.add(
newMockTopicSubscription(getCommandSubscriptionTopic("tenant-1", "device-A"), MqttQoS.AT_MOST_ONCE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package org.eclipse.hono.deviceconnection.infinispan.client;

import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -116,8 +117,8 @@ public Future<JsonObject> getLastKnownGatewayForDevice(final String deviceId, fi

@Override
public Future<Void> setCommandHandlingAdapterInstance(final String deviceId, final String adapterInstanceId,
final int lifespanSeconds, final SpanContext context) {
return cache.setCommandHandlingAdapterInstance(tenantId, deviceId, adapterInstanceId, lifespanSeconds, context);
final Duration lifespan, final SpanContext context) {
return cache.setCommandHandlingAdapterInstance(tenantId, deviceId, adapterInstanceId, lifespan, context);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package org.eclipse.hono.deviceconnection.infinispan.client;

import java.net.HttpURLConnection;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
Expand Down Expand Up @@ -139,20 +140,23 @@ public Future<JsonObject> getLastKnownGatewayForDevice(final String tenantId, fi

@Override
public Future<Void> setCommandHandlingAdapterInstance(final String tenantId, final String deviceId,
final String adapterInstanceId, final int lifespanSeconds, final SpanContext context) {
final String adapterInstanceId, final Duration lifespan, final SpanContext context) {
Objects.requireNonNull(tenantId);
Objects.requireNonNull(deviceId);
Objects.requireNonNull(adapterInstanceId);

return cache.put(getAdapterInstanceEntryKey(tenantId, deviceId), adapterInstanceId, lifespanSeconds, TimeUnit.SECONDS)
// sanity check, preventing an ArithmeticException in lifespan.toMillis()
final long lifespanMillis = lifespan == null || lifespan.isNegative()
|| lifespan.getSeconds() > (Long.MAX_VALUE / 1000L) ? -1 : lifespan.toMillis();
return cache.put(getAdapterInstanceEntryKey(tenantId, deviceId), adapterInstanceId, lifespanMillis, TimeUnit.MILLISECONDS)
.map(replacedValue -> {
LOG.debug("set command handling adapter instance [tenant: {}, device-id: {}, adapter-instance: {}, lifespan: {}s]",
tenantId, deviceId, adapterInstanceId, lifespanSeconds);
LOG.debug("set command handling adapter instance [tenant: {}, device-id: {}, adapter-instance: {}, lifespan: {}ms]",
tenantId, deviceId, adapterInstanceId, lifespanMillis);
return (Void) null;
})
.recover(t -> {
LOG.debug("failed to set command handling adapter instance [tenant: {}, device-id: {}, adapter-instance: {}, lifespan: {}s]",
tenantId, deviceId, adapterInstanceId, lifespanSeconds, t);
LOG.debug("failed to set command handling adapter instance [tenant: {}, device-id: {}, adapter-instance: {}, lifespan: {}ms]",
tenantId, deviceId, adapterInstanceId, lifespanMillis, t);
return Future.failedFuture(new ServerErrorException(HttpURLConnection.HTTP_INTERNAL_ERROR, t));
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package org.eclipse.hono.deviceconnection.infinispan.client;

import java.time.Duration;
import java.util.Set;

import io.opentracing.SpanContext;
Expand Down Expand Up @@ -73,8 +74,8 @@ public interface DeviceConnectionInfo {
* @param tenantId The tenant id.
* @param deviceId The device id.
* @param adapterInstanceId The protocol adapter instance id.
* @param lifespanSeconds The lifespan of the mapping entry in seconds. A negative value is interpreted as an
* unlimited lifespan.
* @param lifespan The lifespan of the mapping entry. Using a negative duration or {@code null} here is
* interpreted as an unlimited lifespan.
* @param context The currently active OpenTracing span context or {@code null} if no span is currently active.
* Implementing classes should use this as the parent for any span they create for tracing
* the execution of this operation.
Expand All @@ -85,7 +86,7 @@ public interface DeviceConnectionInfo {
* @throws NullPointerException if any of the parameters except context is {@code null}.
*/
Future<Void> setCommandHandlingAdapterInstance(String tenantId, String deviceId, String adapterInstanceId,
int lifespanSeconds, SpanContext context);
Duration lifespan, SpanContext context);

/**
* Removes the mapping information that associates the given device with the given protocol adapter instance
Expand Down
Loading

0 comments on commit 036decd

Please sign in to comment.