From 036decd592b80dc161d294c926b5722f2eaf08f4 Mon Sep 17 00:00:00 2001 From: Carsten Lohmann Date: Wed, 22 Apr 2020 06:11:56 +0200 Subject: [PATCH] [#1858] Use ttd value as command consumer lifespan. Also change the type of the lifespan parameter from int to Duration in the methods of the DeviceConnectionInfo, DeviceConnectionClient and DeviceConnectionService interfaces. Signed-off-by: Carsten Lohmann --- .../impl/VertxBasedAmqpProtocolAdapter.java | 4 +- .../VertxBasedAmqpProtocolAdapterTest.java | 6 +- .../coap/AbstractVertxBasedCoapAdapter.java | 3 + .../AbstractVertxBasedCoapAdapterTest.java | 4 +- ...AbstractVertxBasedHttpProtocolAdapter.java | 3 + ...ractVertxBasedHttpProtocolAdapterTest.java | 8 +-- .../VertxBasedHttpProtocolAdapterTest.java | 8 +-- .../lora/impl/LoraProtocolAdapter.java | 1 + ...AbstractVertxBasedMqttProtocolAdapter.java | 4 +- ...ractVertxBasedMqttProtocolAdapterTest.java | 6 +- .../CacheBasedDeviceConnectionClient.java | 5 +- .../CacheBasedDeviceConnectionInfo.java | 16 +++-- .../client/DeviceConnectionInfo.java | 7 +- .../CacheBasedDeviceConnectionInfoTest.java | 52 +++++++------- .../hono/client/DeviceConnectionClient.java | 8 ++- ...ProtocolAdapterCommandConsumerFactory.java | 10 +++ .../impl/DeviceConnectionClientImpl.java | 4 +- ...ocolAdapterCommandConsumerFactoryImpl.java | 57 ++++++++++++---- .../impl/DeviceConnectionClientImplTest.java | 8 +-- ...AdapterCommandConsumerFactoryImplTest.java | 54 +++++++++++++-- .../org/eclipse/hono/util/MessageHelper.java | 2 +- .../service/AbstractProtocolAdapterBase.java | 1 + ...elegatingDeviceConnectionAmqpEndpoint.java | 11 +-- .../DeviceConnectionService.java | 8 ++- .../CacheBasedDeviceConnectionService.java | 5 +- ...CacheBasedDeviceConnectionServiceTest.java | 7 +- .../MapBasedDeviceConnectionService.java | 20 +++--- .../MapBasedDeviceConnectionServiceTest.java | 67 +++++++++---------- .../jms/JmsBasedDeviceConnectionClient.java | 4 +- .../registry/DeviceConnectionApiTests.java | 7 +- 30 files changed, 247 insertions(+), 153 deletions(-) diff --git a/adapters/amqp-vertx/src/main/java/org/eclipse/hono/adapter/amqp/impl/VertxBasedAmqpProtocolAdapter.java b/adapters/amqp-vertx/src/main/java/org/eclipse/hono/adapter/amqp/impl/VertxBasedAmqpProtocolAdapter.java index 54c8dec5e4b..9da98f346ad 100644 --- a/adapters/amqp-vertx/src/main/java/org/eclipse/hono/adapter/amqp/impl/VertxBasedAmqpProtocolAdapter.java +++ b/adapters/amqp-vertx/src/main/java/org/eclipse/hono/adapter/amqp/impl/VertxBasedAmqpProtocolAdapter.java @@ -864,10 +864,10 @@ private Future createCommandConsumer( if (authenticatedDevice != null && !authenticatedDevice.getDeviceId().equals(sourceAddress.getResourceId())) { // gateway scenario return getCommandConsumerFactory().createCommandConsumer(sourceAddress.getTenantId(), - sourceAddress.getResourceId(), authenticatedDevice.getDeviceId(), commandHandler, span.context()); + sourceAddress.getResourceId(), authenticatedDevice.getDeviceId(), commandHandler, null, span.context()); } else { return getCommandConsumerFactory().createCommandConsumer(sourceAddress.getTenantId(), - sourceAddress.getResourceId(), commandHandler, span.context()); + sourceAddress.getResourceId(), commandHandler, null, span.context()); } } diff --git a/adapters/amqp-vertx/src/test/java/org/eclipse/hono/adapter/amqp/impl/VertxBasedAmqpProtocolAdapterTest.java b/adapters/amqp-vertx/src/test/java/org/eclipse/hono/adapter/amqp/impl/VertxBasedAmqpProtocolAdapterTest.java index 2fad1523997..55da66f7351 100644 --- a/adapters/amqp-vertx/src/test/java/org/eclipse/hono/adapter/amqp/impl/VertxBasedAmqpProtocolAdapterTest.java +++ b/adapters/amqp-vertx/src/test/java/org/eclipse/hono/adapter/amqp/impl/VertxBasedAmqpProtocolAdapterTest.java @@ -455,7 +455,7 @@ public void testAdapterOpensSenderLinkAndNotifyDownstreamApplication() { // WHEN an unauthenticated device opens a receiver link with a valid source address final ProtonConnection deviceConnection = mock(ProtonConnection.class); when(deviceConnection.attachments()).thenReturn(mock(Record.class)); - when(commandConsumerFactory.createCommandConsumer(eq(TEST_TENANT_ID), eq(TEST_DEVICE), any(Handler.class), any())) + when(commandConsumerFactory.createCommandConsumer(eq(TEST_TENANT_ID), eq(TEST_DEVICE), any(Handler.class), any(), any())) .thenReturn(Future.succeededFuture(mock(MessageConsumer.class))); final String sourceAddress = String.format("%s/%s/%s", getCommandEndpoint(), TEST_TENANT_ID, TEST_DEVICE); final ProtonSender sender = getSender(sourceAddress); @@ -488,7 +488,7 @@ public void testAdapterClosesCommandConsumerWhenDeviceClosesReceiverLink() { // and a device that wants to receive commands final MessageConsumer commandConsumer = mock(MessageConsumer.class); - when(commandConsumerFactory.createCommandConsumer(eq(TEST_TENANT_ID), eq(TEST_DEVICE), any(Handler.class), any())) + when(commandConsumerFactory.createCommandConsumer(eq(TEST_TENANT_ID), eq(TEST_DEVICE), any(Handler.class), any(), any())) .thenReturn(Future.succeededFuture(commandConsumer)); final String sourceAddress = String.format("%s", getCommandEndpoint()); final ProtonSender sender = getSender(sourceAddress); @@ -588,7 +588,7 @@ private void testAdapterClosesCommandConsumer( // that wants to receive commands final MessageConsumer commandConsumer = mock(MessageConsumer.class); - when(commandConsumerFactory.createCommandConsumer(eq(TEST_TENANT_ID), eq(TEST_DEVICE), any(Handler.class), any())) + when(commandConsumerFactory.createCommandConsumer(eq(TEST_TENANT_ID), eq(TEST_DEVICE), any(Handler.class), any(), any())) .thenReturn(Future.succeededFuture(commandConsumer)); final String sourceAddress = getCommandEndpoint(); final ProtonSender sender = getSender(sourceAddress); diff --git a/adapters/coap-vertx-base/src/main/java/org/eclipse/hono/adapter/coap/AbstractVertxBasedCoapAdapter.java b/adapters/coap-vertx-base/src/main/java/org/eclipse/hono/adapter/coap/AbstractVertxBasedCoapAdapter.java index cea0034cbd5..1ad8a70053b 100644 --- a/adapters/coap-vertx-base/src/main/java/org/eclipse/hono/adapter/coap/AbstractVertxBasedCoapAdapter.java +++ b/adapters/coap-vertx-base/src/main/java/org/eclipse/hono/adapter/coap/AbstractVertxBasedCoapAdapter.java @@ -20,6 +20,7 @@ import java.security.Principal; import java.security.PrivateKey; import java.security.cert.Certificate; +import java.time.Duration; import java.util.HashSet; import java.util.Objects; import java.util.Optional; @@ -983,12 +984,14 @@ protected final Future createCommandConsumer( deviceId, gatewayId, commandHandler, + Duration.ofSeconds(ttdSecs), currentSpan.context()); } else { commandConsumerFuture = getCommandConsumerFactory().createCommandConsumer( tenantObject.getTenantId(), deviceId, commandHandler, + Duration.ofSeconds(ttdSecs), currentSpan.context()); } return commandConsumerFuture diff --git a/adapters/coap-vertx-base/src/test/java/org/eclipse/hono/adapter/coap/AbstractVertxBasedCoapAdapterTest.java b/adapters/coap-vertx-base/src/test/java/org/eclipse/hono/adapter/coap/AbstractVertxBasedCoapAdapterTest.java index f4ffc84cf3c..e25bea01428 100644 --- a/adapters/coap-vertx-base/src/test/java/org/eclipse/hono/adapter/coap/AbstractVertxBasedCoapAdapterTest.java +++ b/adapters/coap-vertx-base/src/test/java/org/eclipse/hono/adapter/coap/AbstractVertxBasedCoapAdapterTest.java @@ -158,9 +158,9 @@ public void setup() { commandConsumer = mock(MessageConsumer.class); commandConsumerFactory = mock(ProtocolAdapterCommandConsumerFactory.class); when(commandConsumerFactory.connect()).thenReturn(Future.succeededFuture(mock(HonoConnection.class))); - when(commandConsumerFactory.createCommandConsumer(anyString(), anyString(), any(Handler.class), any())) + when(commandConsumerFactory.createCommandConsumer(anyString(), anyString(), any(Handler.class), any(), any())) .thenReturn(Future.succeededFuture(commandConsumer)); - when(commandConsumerFactory.createCommandConsumer(anyString(), anyString(), anyString(), any(Handler.class), any())) + when(commandConsumerFactory.createCommandConsumer(anyString(), anyString(), anyString(), any(Handler.class), any(), any())) .thenReturn(Future.succeededFuture(commandConsumer)); deviceConnectionClientFactory = mock(DeviceConnectionClientFactory.class); diff --git a/adapters/http-vertx-base/src/main/java/org/eclipse/hono/adapter/http/AbstractVertxBasedHttpProtocolAdapter.java b/adapters/http-vertx-base/src/main/java/org/eclipse/hono/adapter/http/AbstractVertxBasedHttpProtocolAdapter.java index ec84579a070..bf115e092f4 100644 --- a/adapters/http-vertx-base/src/main/java/org/eclipse/hono/adapter/http/AbstractVertxBasedHttpProtocolAdapter.java +++ b/adapters/http-vertx-base/src/main/java/org/eclipse/hono/adapter/http/AbstractVertxBasedHttpProtocolAdapter.java @@ -14,6 +14,7 @@ package org.eclipse.hono.adapter.http; import java.net.HttpURLConnection; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -1057,12 +1058,14 @@ protected final Future createCommandConsumer( deviceId, gatewayId, commandHandler, + Duration.ofSeconds(ttdSecs), currentSpan.context()); } else { commandConsumerFuture = getCommandConsumerFactory().createCommandConsumer( tenantObject.getTenantId(), deviceId, commandHandler, + Duration.ofSeconds(ttdSecs), currentSpan.context()); } return commandConsumerFuture diff --git a/adapters/http-vertx-base/src/test/java/org/eclipse/hono/adapter/http/AbstractVertxBasedHttpProtocolAdapterTest.java b/adapters/http-vertx-base/src/test/java/org/eclipse/hono/adapter/http/AbstractVertxBasedHttpProtocolAdapterTest.java index 866ffa5dfc7..889c40e1b48 100644 --- a/adapters/http-vertx-base/src/test/java/org/eclipse/hono/adapter/http/AbstractVertxBasedHttpProtocolAdapterTest.java +++ b/adapters/http-vertx-base/src/test/java/org/eclipse/hono/adapter/http/AbstractVertxBasedHttpProtocolAdapterTest.java @@ -174,7 +174,7 @@ public void setup() { }).when(commandConsumer).close(any(Handler.class)); commandConsumerFactory = mock(ProtocolAdapterCommandConsumerFactory.class); when(commandConsumerFactory.connect()).thenReturn(Future.succeededFuture(mock(HonoConnection.class))); - when(commandConsumerFactory.createCommandConsumer(anyString(), anyString(), any(Handler.class), any())) + when(commandConsumerFactory.createCommandConsumer(anyString(), anyString(), any(Handler.class), any(), any())) .thenReturn(Future.succeededFuture(commandConsumer)); commandTargetMapper = mock(CommandTargetMapper.class); @@ -284,7 +284,7 @@ public void testUploadTelemetryFailsForDisabledTenant() { // THEN the device gets a 403 assertContextFailedWithClientError(ctx, HttpURLConnection.HTTP_FORBIDDEN); // and no Command consumer has been created for the device - verify(commandConsumerFactory, never()).createCommandConsumer(anyString(), anyString(), any(Handler.class), any()); + verify(commandConsumerFactory, never()).createCommandConsumer(anyString(), anyString(), any(Handler.class), any(), any()); // and the message has not been forwarded downstream verify(sender, never()).send(any(Message.class)); // and has not been reported as processed @@ -337,7 +337,7 @@ public void testUploadTelemetryFailsForUnknownDevice() { // and the message has not been forwarded downstream verify(sender, never()).send(any(Message.class), any(SpanContext.class)); // and no Command consumer has been created for the device - verify(commandConsumerFactory, never()).createCommandConsumer(anyString(), anyString(), any(Handler.class), any()); + verify(commandConsumerFactory, never()).createCommandConsumer(anyString(), anyString(), any(Handler.class), any(), any()); // and has not been reported as processed verify(metrics, never()) .reportTelemetry( @@ -742,7 +742,7 @@ public void testUploadTelemetryWithTtdClosesCommandConsumerIfSenderCreationFaile new ServerErrorException(HttpURLConnection.HTTP_UNAVAILABLE, "not connected"))); // and the creation of the command consumer completes at a later point final Promise commandConsumerPromise = Promise.promise(); - when(commandConsumerFactory.createCommandConsumer(anyString(), anyString(), any(Handler.class), any())) + when(commandConsumerFactory.createCommandConsumer(anyString(), anyString(), any(Handler.class), any(), any())) .thenReturn(commandConsumerPromise.future()); adapter.uploadTelemetryMessage(ctx, "tenant", "device"); diff --git a/adapters/http-vertx/src/test/java/org/eclipse/hono/adapter/http/impl/VertxBasedHttpProtocolAdapterTest.java b/adapters/http-vertx/src/test/java/org/eclipse/hono/adapter/http/impl/VertxBasedHttpProtocolAdapterTest.java index ff1971508a2..01ef076e622 100644 --- a/adapters/http-vertx/src/test/java/org/eclipse/hono/adapter/http/impl/VertxBasedHttpProtocolAdapterTest.java +++ b/adapters/http-vertx/src/test/java/org/eclipse/hono/adapter/http/impl/VertxBasedHttpProtocolAdapterTest.java @@ -237,7 +237,7 @@ public void setUp(final TestInfo testInfo) { } return null; }).when(commandConsumer).close(any(Handler.class)); - when(commandConsumerFactory.createCommandConsumer(anyString(), anyString(), any(Handler.class), any())). + when(commandConsumerFactory.createCommandConsumer(anyString(), anyString(), any(Handler.class), any(), any())). thenReturn(Future.succeededFuture(commandConsumer)); telemetrySender = mock(DownstreamSender.class); @@ -490,8 +490,8 @@ public void testPostTelemetryWithTtdSucceedsWithCommandInResponse(final VertxTes final Command pendingCommand = Command.from(msg, "DEFAULT_TENANT", "device_1"); final CommandContext commandContext = CommandContext.from(pendingCommand, mock(ProtonDelivery.class), mock(Span.class)); final MessageConsumer commandConsumer = mock(MessageConsumer.class); - when(commandConsumerFactory.createCommandConsumer(eq("DEFAULT_TENANT"), eq("device_1"), any(Handler.class), any())). - thenAnswer(invocation -> { + when(commandConsumerFactory.createCommandConsumer(eq("DEFAULT_TENANT"), eq("device_1"), any(Handler.class), any(), any())) + .thenAnswer(invocation -> { final Handler consumer = invocation.getArgument(2); consumer.handle(commandContext); return Future.succeededFuture(commandConsumer); @@ -518,7 +518,7 @@ public void testPostTelemetryWithTtdSucceedsWithCommandInResponse(final VertxTes .sendJsonObject(new JsonObject(), ctx.succeeding(r -> { ctx.verify(() -> { verify(commandConsumerFactory).createCommandConsumer(eq("DEFAULT_TENANT"), eq("device_1"), - any(Handler.class), any()); + any(Handler.class), any(), any()); // and the command consumer has been closed again verify(commandConsumer).close(any()); }); diff --git a/adapters/lora-vertx/src/main/java/org/eclipse/hono/adapter/lora/impl/LoraProtocolAdapter.java b/adapters/lora-vertx/src/main/java/org/eclipse/hono/adapter/lora/impl/LoraProtocolAdapter.java index fac77d7a0cd..3c2ffaf5d0a 100644 --- a/adapters/lora-vertx/src/main/java/org/eclipse/hono/adapter/lora/impl/LoraProtocolAdapter.java +++ b/adapters/lora-vertx/src/main/java/org/eclipse/hono/adapter/lora/impl/LoraProtocolAdapter.java @@ -335,6 +335,7 @@ private Future startLoraCommandConsumer(final String tenantId) tenantId, LORA_COMMAND_CONSUMER_DEVICE_ID, receivedCommandContext -> commandConsumer(tenantId, receivedCommandContext), + null, null); } diff --git a/adapters/mqtt-vertx-base/src/main/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter.java b/adapters/mqtt-vertx-base/src/main/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter.java index 7b05f7de530..93dc8f1de12 100644 --- a/adapters/mqtt-vertx-base/src/main/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter.java +++ b/adapters/mqtt-vertx-base/src/main/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter.java @@ -774,10 +774,10 @@ private Future createCommandConsumer(final MqttEndpoint mqttEnd if (sub.isGatewaySubscriptionForSpecificDevice()) { // gateway scenario return getCommandConsumerFactory().createCommandConsumer(sub.getTenant(), sub.getDeviceId(), - sub.getAuthenticatedDeviceId(), commandHandler, span.context()); + sub.getAuthenticatedDeviceId(), commandHandler, null, span.context()); } else { return getCommandConsumerFactory().createCommandConsumer(sub.getTenant(), sub.getDeviceId(), commandHandler, - span.context()); + null, span.context()); } } diff --git a/adapters/mqtt-vertx-base/src/test/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapterTest.java b/adapters/mqtt-vertx-base/src/test/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapterTest.java index 1e976113ca1..eba960103ea 100644 --- a/adapters/mqtt-vertx-base/src/test/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapterTest.java +++ b/adapters/mqtt-vertx-base/src/test/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapterTest.java @@ -915,7 +915,7 @@ private void testOnSubscribeRegistersAndClosesConnection(final MqttQoS qos) { // WHEN a device subscribes to commands final MessageConsumer commandConsumer = mock(MessageConsumer.class); - when(commandConsumerFactory.createCommandConsumer(eq("tenant"), eq("deviceId"), any(Handler.class), any())) + when(commandConsumerFactory.createCommandConsumer(eq("tenant"), eq("deviceId"), any(Handler.class), any(), any())) .thenReturn(Future.succeededFuture(commandConsumer)); final List subscriptions = Collections.singletonList( newMockTopicSubscription(getCommandSubscriptionTopic("tenant", "deviceId"), qos)); @@ -929,7 +929,7 @@ private void testOnSubscribeRegistersAndClosesConnection(final MqttQoS qos) { adapter.onSubscribe(endpoint, null, msg, cmdHandler, OptionalInt.empty()); // THEN the adapter creates a command consumer that is checked periodically - verify(commandConsumerFactory).createCommandConsumer(eq("tenant"), eq("deviceId"), any(Handler.class), any()); + verify(commandConsumerFactory).createCommandConsumer(eq("tenant"), eq("deviceId"), any(Handler.class), any(), any()); // and the adapter registers a hook on the connection to the device final ArgumentCaptor> closeHookCaptor = ArgumentCaptor.forClass(Handler.class); verify(endpoint).closeHandler(closeHookCaptor.capture()); @@ -965,7 +965,7 @@ public void testOnSubscribeIncludesStatusCodeForEachFilter() { subscriptions.add(newMockTopicSubscription("bumlux/+/+/#", MqttQoS.AT_MOST_ONCE)); subscriptions.add(newMockTopicSubscription("bumlux/+/+/#", MqttQoS.AT_MOST_ONCE)); // and for subscribing to commands - when(commandConsumerFactory.createCommandConsumer(eq("tenant-1"), eq("device-A"), any(Handler.class), any())) + when(commandConsumerFactory.createCommandConsumer(eq("tenant-1"), eq("device-A"), any(Handler.class), any(), any())) .thenReturn(Future.succeededFuture(mock(MessageConsumer.class))); subscriptions.add( newMockTopicSubscription(getCommandSubscriptionTopic("tenant-1", "device-A"), MqttQoS.AT_MOST_ONCE)); diff --git a/client-device-connection-infinispan/src/main/java/org/eclipse/hono/deviceconnection/infinispan/client/CacheBasedDeviceConnectionClient.java b/client-device-connection-infinispan/src/main/java/org/eclipse/hono/deviceconnection/infinispan/client/CacheBasedDeviceConnectionClient.java index b29a89e48bf..365ce8c8734 100644 --- a/client-device-connection-infinispan/src/main/java/org/eclipse/hono/deviceconnection/infinispan/client/CacheBasedDeviceConnectionClient.java +++ b/client-device-connection-infinispan/src/main/java/org/eclipse/hono/deviceconnection/infinispan/client/CacheBasedDeviceConnectionClient.java @@ -14,6 +14,7 @@ package org.eclipse.hono.deviceconnection.infinispan.client; +import java.time.Duration; import java.util.HashSet; import java.util.List; import java.util.Objects; @@ -116,8 +117,8 @@ public Future getLastKnownGatewayForDevice(final String deviceId, fi @Override public Future setCommandHandlingAdapterInstance(final String deviceId, final String adapterInstanceId, - final int lifespanSeconds, final SpanContext context) { - return cache.setCommandHandlingAdapterInstance(tenantId, deviceId, adapterInstanceId, lifespanSeconds, context); + final Duration lifespan, final SpanContext context) { + return cache.setCommandHandlingAdapterInstance(tenantId, deviceId, adapterInstanceId, lifespan, context); } @Override diff --git a/client-device-connection-infinispan/src/main/java/org/eclipse/hono/deviceconnection/infinispan/client/CacheBasedDeviceConnectionInfo.java b/client-device-connection-infinispan/src/main/java/org/eclipse/hono/deviceconnection/infinispan/client/CacheBasedDeviceConnectionInfo.java index c91517b3a7e..cbe253fbe2d 100644 --- a/client-device-connection-infinispan/src/main/java/org/eclipse/hono/deviceconnection/infinispan/client/CacheBasedDeviceConnectionInfo.java +++ b/client-device-connection-infinispan/src/main/java/org/eclipse/hono/deviceconnection/infinispan/client/CacheBasedDeviceConnectionInfo.java @@ -14,6 +14,7 @@ package org.eclipse.hono.deviceconnection.infinispan.client; import java.net.HttpURLConnection; +import java.time.Duration; import java.util.ArrayList; import java.util.HashSet; import java.util.Map; @@ -139,20 +140,23 @@ public Future getLastKnownGatewayForDevice(final String tenantId, fi @Override public Future setCommandHandlingAdapterInstance(final String tenantId, final String deviceId, - final String adapterInstanceId, final int lifespanSeconds, final SpanContext context) { + final String adapterInstanceId, final Duration lifespan, final SpanContext context) { Objects.requireNonNull(tenantId); Objects.requireNonNull(deviceId); Objects.requireNonNull(adapterInstanceId); - return cache.put(getAdapterInstanceEntryKey(tenantId, deviceId), adapterInstanceId, lifespanSeconds, TimeUnit.SECONDS) + // sanity check, preventing an ArithmeticException in lifespan.toMillis() + final long lifespanMillis = lifespan == null || lifespan.isNegative() + || lifespan.getSeconds() > (Long.MAX_VALUE / 1000L) ? -1 : lifespan.toMillis(); + return cache.put(getAdapterInstanceEntryKey(tenantId, deviceId), adapterInstanceId, lifespanMillis, TimeUnit.MILLISECONDS) .map(replacedValue -> { - LOG.debug("set command handling adapter instance [tenant: {}, device-id: {}, adapter-instance: {}, lifespan: {}s]", - tenantId, deviceId, adapterInstanceId, lifespanSeconds); + LOG.debug("set command handling adapter instance [tenant: {}, device-id: {}, adapter-instance: {}, lifespan: {}ms]", + tenantId, deviceId, adapterInstanceId, lifespanMillis); return (Void) null; }) .recover(t -> { - LOG.debug("failed to set command handling adapter instance [tenant: {}, device-id: {}, adapter-instance: {}, lifespan: {}s]", - tenantId, deviceId, adapterInstanceId, lifespanSeconds, t); + LOG.debug("failed to set command handling adapter instance [tenant: {}, device-id: {}, adapter-instance: {}, lifespan: {}ms]", + tenantId, deviceId, adapterInstanceId, lifespanMillis, t); return Future.failedFuture(new ServerErrorException(HttpURLConnection.HTTP_INTERNAL_ERROR, t)); }); } diff --git a/client-device-connection-infinispan/src/main/java/org/eclipse/hono/deviceconnection/infinispan/client/DeviceConnectionInfo.java b/client-device-connection-infinispan/src/main/java/org/eclipse/hono/deviceconnection/infinispan/client/DeviceConnectionInfo.java index 00b94815b58..823741275ec 100644 --- a/client-device-connection-infinispan/src/main/java/org/eclipse/hono/deviceconnection/infinispan/client/DeviceConnectionInfo.java +++ b/client-device-connection-infinispan/src/main/java/org/eclipse/hono/deviceconnection/infinispan/client/DeviceConnectionInfo.java @@ -13,6 +13,7 @@ package org.eclipse.hono.deviceconnection.infinispan.client; +import java.time.Duration; import java.util.Set; import io.opentracing.SpanContext; @@ -73,8 +74,8 @@ public interface DeviceConnectionInfo { * @param tenantId The tenant id. * @param deviceId The device id. * @param adapterInstanceId The protocol adapter instance id. - * @param lifespanSeconds The lifespan of the mapping entry in seconds. A negative value is interpreted as an - * unlimited lifespan. + * @param lifespan The lifespan of the mapping entry. Using a negative duration or {@code null} here is + * interpreted as an unlimited lifespan. * @param context The currently active OpenTracing span context or {@code null} if no span is currently active. * Implementing classes should use this as the parent for any span they create for tracing * the execution of this operation. @@ -85,7 +86,7 @@ public interface DeviceConnectionInfo { * @throws NullPointerException if any of the parameters except context is {@code null}. */ Future setCommandHandlingAdapterInstance(String tenantId, String deviceId, String adapterInstanceId, - int lifespanSeconds, SpanContext context); + Duration lifespan, SpanContext context); /** * Removes the mapping information that associates the given device with the given protocol adapter instance diff --git a/client-device-connection-infinispan/src/test/java/org/eclipse/hono/deviceconnection/infinispan/client/CacheBasedDeviceConnectionInfoTest.java b/client-device-connection-infinispan/src/test/java/org/eclipse/hono/deviceconnection/infinispan/client/CacheBasedDeviceConnectionInfoTest.java index b872ccffb86..ce05e2c7775 100644 --- a/client-device-connection-infinispan/src/test/java/org/eclipse/hono/deviceconnection/infinispan/client/CacheBasedDeviceConnectionInfoTest.java +++ b/client-device-connection-infinispan/src/test/java/org/eclipse/hono/deviceconnection/infinispan/client/CacheBasedDeviceConnectionInfoTest.java @@ -19,7 +19,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -27,6 +26,7 @@ import java.io.IOException; import java.net.HttpURLConnection; +import java.time.Duration; import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -197,7 +197,7 @@ void testGetLastKnownGatewayFails(final VertxTestContext ctx) { public void testSetCommandHandlingAdapterInstanceSucceeds(final VertxTestContext ctx) { final String deviceId = "testDevice"; final String adapterInstance = "adapterInstance"; - info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, -1, spanContext) + info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, spanContext) .setHandler(ctx.succeeding(result -> ctx.completeNow())); } @@ -211,7 +211,7 @@ public void testSetCommandHandlingAdapterInstanceSucceeds(final VertxTestContext public void testSetCommandHandlingAdapterInstanceWithLifespanSucceeds(final VertxTestContext ctx) { final String deviceId = "testDevice"; final String adapterInstance = "adapterInstance"; - info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, 10, spanContext) + info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, Duration.ofSeconds(10), spanContext) .setHandler(ctx.succeeding(result -> ctx.completeNow())); } @@ -224,7 +224,7 @@ public void testSetCommandHandlingAdapterInstanceWithLifespanSucceeds(final Vert public void testRemoveCommandHandlingAdapterInstanceSucceeds(final VertxTestContext ctx) { final String deviceId = "testDevice"; final String adapterInstance = "adapterInstance"; - info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, -1, spanContext) + info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, spanContext) .compose(v -> info.removeCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, spanContext)) .setHandler(ctx.succeeding(result -> ctx.completeNow())); @@ -241,7 +241,7 @@ public void testRemoveCommandHandlingAdapterInstanceSucceeds(final VertxTestCont public void testRemoveCommandHandlingAdapterInstanceFailsForOtherDevice(final VertxTestContext ctx) { final String deviceId = "testDevice"; final String adapterInstance = "adapterInstance"; - info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, -1, spanContext) + info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, spanContext) .compose(v -> { return info.removeCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, "otherDevice", adapterInstance, spanContext); }).setHandler(ctx.failing(t -> ctx.verify(() -> { @@ -261,7 +261,7 @@ public void testRemoveCommandHandlingAdapterInstanceFailsForOtherDevice(final Ve public void testRemoveCommandHandlingAdapterInstanceFailsForOtherAdapterInstance(final VertxTestContext ctx) { final String deviceId = "testDevice"; final String adapterInstance = "adapterInstance"; - info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, -1, spanContext) + info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, spanContext) .compose(v -> { return info.removeCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, "otherAdapterInstance", spanContext); }).setHandler(ctx.failing(t -> ctx.verify(() -> { @@ -281,7 +281,7 @@ public void testRemoveCommandHandlingAdapterInstanceFailsForOtherAdapterInstance public void testGetCommandHandlingAdapterInstancesForDevice(final VertxTestContext ctx) { final String deviceId = "testDevice"; final String adapterInstance = "adapterInstance"; - info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, -1, spanContext) + info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, spanContext) .compose(v -> { return info.getCommandHandlingAdapterInstances(Constants.DEFAULT_TENANT, deviceId, Collections.emptySet(), spanContext); }).setHandler(ctx.succeeding(result -> ctx.verify(() -> { @@ -306,13 +306,7 @@ public void testGetCommandHandlingAdapterInstancesWithExpiredEntry(final Vertx v final Cache mockedCache = spy(cache); info = new CacheBasedDeviceConnectionInfo(mockedCache, tracer); - // for the sake of the test, always let a lifespan of just 1ms be set on the cache - when(mockedCache.put(anyString(), anyString(), anyLong(), any(TimeUnit.class))).thenAnswer(invocation -> { - final String key = invocation.getArgument(0); - final String value = invocation.getArgument(1); - return cache.put(key, value, 1, TimeUnit.MILLISECONDS); - }); - info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, 10, spanContext) + info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, Duration.ofMillis(1), spanContext) .compose(v -> { final Promise instancesPromise = Promise.promise(); // wait 2ms to make sure entry has expired after that @@ -365,10 +359,10 @@ public void testGetCommandHandlingAdapterInstancesForLastKnownGateway(final Set< final Set viaGateways = new HashSet<>(Set.of(gatewayId, otherGatewayId)); viaGateways.addAll(extraUnusedViaGateways); // set command handling adapter instance for gateway - info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, -1, spanContext) + info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, null, spanContext) .compose(v -> { // set command handling adapter instance for other gateway - return info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, otherGatewayId, otherAdapterInstance, -1, spanContext); + return info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, otherGatewayId, otherAdapterInstance, null, spanContext); }).compose(deviceConnectionResult -> { return info.setLastKnownGatewayForDevice(Constants.DEFAULT_TENANT, deviceId, gatewayId, spanContext); }).compose(u -> { @@ -402,10 +396,10 @@ public void testGetCommandHandlingAdapterInstancesWithMultiResultAndLastKnownGat final Set viaGateways = new HashSet<>(Set.of(gatewayId, otherGatewayId)); viaGateways.addAll(extraUnusedViaGateways); // set command handling adapter instance for gateway - info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, -1, spanContext) + info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, null, spanContext) .compose(v -> { // set command handling adapter instance for other gateway - return info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, otherGatewayId, otherAdapterInstance, -1, spanContext); + return info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, otherGatewayId, otherAdapterInstance, null, spanContext); }).compose(deviceConnectionResult -> { return info.setLastKnownGatewayForDevice(Constants.DEFAULT_TENANT, deviceId, gatewayIdNotInVia, spanContext); }).compose(u -> { @@ -440,10 +434,10 @@ public void testGetCommandHandlingAdapterInstancesWithMultiResultAndNoAdapterFor final Set viaGateways = new HashSet<>(Set.of(gatewayId, otherGatewayId, gatewayWithNoAdapterInstance)); viaGateways.addAll(extraUnusedViaGateways); // set command handling adapter instance for gateway - info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, -1, spanContext) + info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, null, spanContext) .compose(v -> { // set command handling adapter instance for other gateway - return info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, otherGatewayId, otherAdapterInstance, -1, spanContext); + return info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, otherGatewayId, otherAdapterInstance, null, spanContext); }).compose(deviceConnectionResult -> { return info.setLastKnownGatewayForDevice(Constants.DEFAULT_TENANT, deviceId, gatewayWithNoAdapterInstance, spanContext); }).compose(u -> { @@ -475,7 +469,7 @@ public void testGetCommandHandlingAdapterInstancesForSingleResultAndLastKnownGat final Set viaGateways = new HashSet<>(Set.of("otherGatewayId")); viaGateways.addAll(extraUnusedViaGateways); // set command handling adapter instance for gateway - info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, -1, spanContext) + info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, null, spanContext) .compose(deviceConnectionResult -> { return info.setLastKnownGatewayForDevice(Constants.DEFAULT_TENANT, deviceId, gatewayId, spanContext); }).compose(u -> { @@ -505,10 +499,10 @@ public void testGetCommandHandlingAdapterInstancesWithLastKnownGatewayIsGivingDe final Set viaGateways = new HashSet<>(Set.of(gatewayId)); viaGateways.addAll(extraUnusedViaGateways); // set command handling adapter instance for device - info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, -1, spanContext) + info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, spanContext) .compose(v -> { // set command handling adapter instance for other gateway - return info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, otherAdapterInstance, -1, spanContext); + return info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, otherAdapterInstance, null, spanContext); }).compose(u -> { return info.setLastKnownGatewayForDevice(Constants.DEFAULT_TENANT, deviceId, gatewayId, spanContext); }).compose(w -> { @@ -540,10 +534,10 @@ public void testGetCommandHandlingAdapterInstancesWithoutLastKnownGatewayIsGivin final Set viaGateways = new HashSet<>(Set.of(gatewayId)); viaGateways.addAll(extraUnusedViaGateways); // set command handling adapter instance for device - info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, -1, spanContext) + info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, spanContext) .compose(v -> { // set command handling adapter instance for other gateway - return info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, otherAdapterInstance, -1, spanContext); + return info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, otherAdapterInstance, null, spanContext); }).compose(w -> { return info.getCommandHandlingAdapterInstances(Constants.DEFAULT_TENANT, deviceId, viaGateways, spanContext); }).setHandler(ctx.succeeding(result -> ctx.verify(() -> { @@ -572,7 +566,7 @@ public void testGetCommandHandlingAdapterInstancesForOneSubscribedVia(final Set< final Set viaGateways = new HashSet<>(Set.of(gatewayId, otherGatewayId)); viaGateways.addAll(extraUnusedViaGateways); // set command handling adapter instance for gateway - info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, -1, spanContext) + info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, null, spanContext) .compose(v -> { return info.getCommandHandlingAdapterInstances(Constants.DEFAULT_TENANT, deviceId, viaGateways, spanContext); }).setHandler(ctx.succeeding(result -> ctx.verify(() -> { @@ -601,10 +595,10 @@ public void testGetCommandHandlingAdapterInstancesForMultipleSubscribedVias(fina final Set viaGateways = new HashSet<>(Set.of(gatewayId, otherGatewayId)); viaGateways.addAll(extraUnusedViaGateways); // set command handling adapter instance for gateway - info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, -1, spanContext) + info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, null, spanContext) .compose(v -> { // set command handling adapter instance for other gateway - return info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, otherGatewayId, otherAdapterInstance, -1, spanContext); + return info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, otherGatewayId, otherAdapterInstance, null, spanContext); }).compose(u -> { return info.getCommandHandlingAdapterInstances(Constants.DEFAULT_TENANT, deviceId, viaGateways, spanContext); }).setHandler(ctx.succeeding(result -> ctx.verify(() -> { @@ -655,7 +649,7 @@ public void testGetCommandHandlingAdapterInstancesFailsForOtherTenantDevice(fina final Set viaGateways = new HashSet<>(Set.of(gatewayId)); viaGateways.addAll(extraUnusedViaGateways); // set command handling adapter instance for other gateway - info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, otherGatewayId, adapterInstance, -1, spanContext) + info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, otherGatewayId, adapterInstance, null, spanContext) .compose(v -> { return info.getCommandHandlingAdapterInstances(Constants.DEFAULT_TENANT, deviceId, viaGateways, spanContext); }).setHandler(ctx.failing(t -> ctx.verify(() -> { diff --git a/client/src/main/java/org/eclipse/hono/client/DeviceConnectionClient.java b/client/src/main/java/org/eclipse/hono/client/DeviceConnectionClient.java index e7219facf72..ab5e4a91ee8 100644 --- a/client/src/main/java/org/eclipse/hono/client/DeviceConnectionClient.java +++ b/client/src/main/java/org/eclipse/hono/client/DeviceConnectionClient.java @@ -13,6 +13,7 @@ package org.eclipse.hono.client; +import java.time.Duration; import java.util.List; import io.opentracing.SpanContext; @@ -72,15 +73,16 @@ public interface DeviceConnectionClient extends RequestResponseClient { * * @param deviceId The device id. * @param adapterInstanceId The protocol adapter instance id. - * @param lifespanSeconds The lifespan of the mapping entry in seconds. A negative value is interpreted as an - * unlimited lifespan. + * @param lifespan The lifespan of the mapping entry. Using a negative duration or {@code null} here is + * interpreted as an unlimited lifespan. Only the number of seconds in the given duration + * will be taken into account. * @param context The currently active OpenTracing span context or {@code null} if no span is currently active. * An implementation should use this as the parent for any span it creates for tracing * the execution of this operation. * @return A future indicating whether the operation succeeded or not. * @throws NullPointerException if device id or adapter instance id is {@code null}. */ - Future setCommandHandlingAdapterInstance(String deviceId, String adapterInstanceId, int lifespanSeconds, + Future setCommandHandlingAdapterInstance(String deviceId, String adapterInstanceId, Duration lifespan, SpanContext context); /** diff --git a/client/src/main/java/org/eclipse/hono/client/ProtocolAdapterCommandConsumerFactory.java b/client/src/main/java/org/eclipse/hono/client/ProtocolAdapterCommandConsumerFactory.java index d4e98590b0c..4313b2be5ea 100644 --- a/client/src/main/java/org/eclipse/hono/client/ProtocolAdapterCommandConsumerFactory.java +++ b/client/src/main/java/org/eclipse/hono/client/ProtocolAdapterCommandConsumerFactory.java @@ -13,6 +13,8 @@ package org.eclipse.hono.client; +import java.time.Duration; + import org.eclipse.hono.client.impl.CommandConsumer; import org.eclipse.hono.client.impl.ProtocolAdapterCommandConsumerFactoryImpl; @@ -67,6 +69,9 @@ void initialize(CommandTargetMapper commandTargetMapper, * @param commandHandler The handler to invoke with every command received. The handler must invoke one of the * terminal methods of the passed in {@link CommandContext} in order to settle the command * message transfer and finish the trace span associated with the {@link CommandContext}. + * @param lifespan The time period in which the command consumer shall be active. Using a negative duration or + * {@code null} here is interpreted as an unlimited lifespan. The guaranteed granularity + * taken into account here is seconds. * @param context The currently active OpenTracing span context or {@code null} if no span is currently active. * An implementation should use this as the parent for any span it creates for tracing * the execution of this operation. @@ -83,6 +88,7 @@ Future createCommandConsumer( String tenantId, String deviceId, Handler commandHandler, + Duration lifespan, SpanContext context); /** @@ -104,6 +110,9 @@ Future createCommandConsumer( * @param commandHandler The handler to invoke with every command received. The handler must invoke one of the * terminal methods of the passed in {@link CommandContext} in order to settle the command * message transfer and finish the trace span associated with the {@link CommandContext}. + * @param lifespan The time period in which the command consumer shall be active. Using a negative duration or + * {@code null} here is interpreted as an unlimited lifespan. The guaranteed granularity + * taken into account here is seconds. * @param context The currently active OpenTracing span context or {@code null} if no span is currently active. * An implementation should use this as the parent for any span it creates for tracing * the execution of this operation. @@ -121,6 +130,7 @@ Future createCommandConsumer( String deviceId, String gatewayId, Handler commandHandler, + Duration lifespan, SpanContext context); /** diff --git a/client/src/main/java/org/eclipse/hono/client/impl/DeviceConnectionClientImpl.java b/client/src/main/java/org/eclipse/hono/client/impl/DeviceConnectionClientImpl.java index 42732331786..e708c3b3eb3 100644 --- a/client/src/main/java/org/eclipse/hono/client/impl/DeviceConnectionClientImpl.java +++ b/client/src/main/java/org/eclipse/hono/client/impl/DeviceConnectionClientImpl.java @@ -14,6 +14,7 @@ package org.eclipse.hono.client.impl; import java.net.HttpURLConnection; +import java.time.Duration; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -260,10 +261,11 @@ public Future removeCommandHandlingAdapterInstance(final String deviceId, @Override public Future setCommandHandlingAdapterInstance(final String deviceId, final String adapterInstanceId, - final int lifespanSeconds, final SpanContext context) { + final Duration lifespan, final SpanContext context) { Objects.requireNonNull(deviceId); Objects.requireNonNull(adapterInstanceId); + final long lifespanSeconds = lifespan != null && lifespan.getSeconds() <= Integer.MAX_VALUE ? lifespan.getSeconds() : -1; final Map properties = createDeviceIdProperties(deviceId); properties.put(MessageHelper.APP_PROPERTY_ADAPTER_INSTANCE_ID, adapterInstanceId); properties.put(MessageHelper.APP_PROPERTY_LIFESPAN, lifespanSeconds); diff --git a/client/src/main/java/org/eclipse/hono/client/impl/ProtocolAdapterCommandConsumerFactoryImpl.java b/client/src/main/java/org/eclipse/hono/client/impl/ProtocolAdapterCommandConsumerFactoryImpl.java index c8f2b4721d0..eda0cd53b73 100644 --- a/client/src/main/java/org/eclipse/hono/client/impl/ProtocolAdapterCommandConsumerFactoryImpl.java +++ b/client/src/main/java/org/eclipse/hono/client/impl/ProtocolAdapterCommandConsumerFactoryImpl.java @@ -14,6 +14,8 @@ package org.eclipse.hono.client.impl; import java.net.HttpURLConnection; +import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -28,6 +30,7 @@ import org.eclipse.hono.client.MessageConsumer; import org.eclipse.hono.client.ProtocolAdapterCommandConsumerFactory; import org.eclipse.hono.client.ServerErrorException; +import org.eclipse.hono.client.ServiceInvocationException; import org.eclipse.hono.util.CommandConstants; import org.eclipse.hono.util.Constants; import org.eclipse.hono.util.ResourceIdentifier; @@ -47,7 +50,7 @@ *
    *
  • A single consumer link on an address containing the protocol adapter instance id.
  • *
  • A tenant-scoped link, created (if not already existing for that tenant) when - * {@link #createCommandConsumer(String, String, Handler, SpanContext)} is invoked.
  • + * {@link #createCommandConsumer(String, String, Handler, Duration, SpanContext)} is invoked. *
*

* Command messages are first received on the tenant-scoped consumer address. It is then determined whether there is @@ -124,11 +127,11 @@ protected void onDisconnect() { */ @Override public final Future createCommandConsumer(final String tenantId, final String deviceId, - final Handler commandHandler, final SpanContext context) { + final Handler commandHandler, final Duration lifespan, final SpanContext context) { Objects.requireNonNull(tenantId); Objects.requireNonNull(deviceId); Objects.requireNonNull(commandHandler); - return doCreateCommandConsumer(tenantId, deviceId, null, commandHandler, context); + return doCreateCommandConsumer(tenantId, deviceId, null, commandHandler, lifespan, context); } /** @@ -136,20 +139,25 @@ public final Future createCommandConsumer(final String tenantId */ @Override public final Future createCommandConsumer(final String tenantId, final String deviceId, - final String gatewayId, final Handler commandHandler, final SpanContext context) { + final String gatewayId, final Handler commandHandler, final Duration lifespan, + final SpanContext context) { Objects.requireNonNull(tenantId); Objects.requireNonNull(deviceId); Objects.requireNonNull(gatewayId); Objects.requireNonNull(commandHandler); - return doCreateCommandConsumer(tenantId, deviceId, gatewayId, commandHandler, context); + return doCreateCommandConsumer(tenantId, deviceId, gatewayId, commandHandler, lifespan, context); } private Future doCreateCommandConsumer(final String tenantId, final String deviceId, - final String gatewayId, final Handler commandHandler, final SpanContext context) { + final String gatewayId, final Handler commandHandler, final Duration lifespan, + final SpanContext context) { if (!initialized.get()) { log.error("not initialized"); return Future.failedFuture(new ServerErrorException(HttpURLConnection.HTTP_INTERNAL_ERROR)); } + // lifespan greater than what can be expressed in nanoseconds (i.e. 292 years) is considered unlimited, preventing ArithmeticExceptions down the road + final Duration sanitizedLifespan = lifespan == null || lifespan.isNegative() + || lifespan.getSeconds() > (Long.MAX_VALUE / 1000_000_000L) ? Duration.ofSeconds(-1) : lifespan; log.trace("create command consumer [tenant-id: {}, device-id: {}, gateway-id: {}]", tenantId, deviceId, gatewayId); return connection.executeOnContext(result -> { // create the tenant-scoped consumer ("command/") that maps/delegates incoming commands to the right handler/adapter-instance @@ -162,20 +170,23 @@ private Future doCreateCommandConsumer(final String tenantId, f // TODO find a way to provide a notification here so that potential resources associated with the replaced consumer can be freed (maybe add a commandHandlerOverwritten Handler param to createCommandConsumer()) } // associate handler with this adapter instance - return setCommandHandlingAdapterInstance(tenantId, deviceId, context); + return setCommandHandlingAdapterInstance(tenantId, deviceId, sanitizedLifespan, context); }) .map(res -> { - final Supplier> onCloseAction = () -> removeCommandConsumer(tenantId, deviceId, context); + final Instant lifespanStart = Instant.now(); + final Supplier> onCloseAction = () -> removeCommandConsumer(tenantId, deviceId, + sanitizedLifespan, lifespanStart, context); return (MessageConsumer) new DeviceSpecificCommandConsumer(onCloseAction); }) .setHandler(result); }); } - private Future setCommandHandlingAdapterInstance(final String tenantId, final String deviceId, final SpanContext context) { + private Future setCommandHandlingAdapterInstance(final String tenantId, final String deviceId, + final Duration lifespan, final SpanContext context) { return deviceConnectionClientFactory.getOrCreateDeviceConnectionClient(tenantId) .compose(client -> { - return client.setCommandHandlingAdapterInstance(deviceId, adapterInstanceId, -1, context); + return client.setCommandHandlingAdapterInstance(deviceId, adapterInstanceId, lifespan, context); }).recover(thr -> { log.info("error setting command handling adapter instance [tenant: {}, device: {}]", tenantId, deviceId, thr); @@ -185,16 +196,34 @@ private Future setCommandHandlingAdapterInstance(final String tenantId, fi }); } - private Future removeCommandConsumer(final String tenantId, final String deviceId, final SpanContext context) { + private Future removeCommandConsumer(final String tenantId, final String deviceId, final Duration lifespan, + final Instant lifespanStart, final SpanContext createCommandConsumerSpanContext) { log.trace("remove command consumer [tenant-id: {}, device-id: {}]", tenantId, deviceId); adapterInstanceCommandHandler.removeDeviceSpecificCommandHandler(tenantId, deviceId); + + final boolean lifespanReached = !lifespan.isNegative() && Instant.now().isAfter(lifespanStart.plus(lifespan)); + // TODO when making handling of the lifespan property mandatory for implementors of the Device Connection API, + // removing the adapter instance can be skipped here if 'lifespanReached' is true return deviceConnectionClientFactory.getOrCreateDeviceConnectionClient(tenantId) .compose(client -> { + // The given span context from the createCommandConsumer invocation is only used here if a non-unlimited lifespan is set + // meaning creating and removing the consumer will probably happen in a timespan short enough for one overall trace. + // The span context isn't used however if lifespanReached is true since the below 'remove' operation will usually (but not necessarily) + // result in a "not found" error in that case, which is expected and should not cause the overall trace to be marked with an error. + final SpanContext context = !lifespan.isNegative() && !lifespanReached ? createCommandConsumerSpanContext : null; return client.removeCommandHandlingAdapterInstance(deviceId, adapterInstanceId, context); }).recover(thr -> { - log.info("error removing command handling adapter instance [tenant: {}, device: {}]", tenantId, - deviceId, thr); - return Future.failedFuture(thr); + if (lifespanReached && thr instanceof ServiceInvocationException + && ((ServiceInvocationException) thr).getErrorCode() == HttpURLConnection.HTTP_NOT_FOUND) { + // entry was not found, meaning it has expired + log.trace("ignoring 404 error when removing command handling adapter instance; entry has already expired [tenant: {}, device: {}]", + tenantId, deviceId); + return Future.succeededFuture(); + } else { + log.info("error removing command handling adapter instance [tenant: {}, device: {}]", tenantId, + deviceId, thr); + return Future.failedFuture(thr); + } }); } diff --git a/client/src/test/java/org/eclipse/hono/client/impl/DeviceConnectionClientImplTest.java b/client/src/test/java/org/eclipse/hono/client/impl/DeviceConnectionClientImplTest.java index 0d810150af4..c0bdb18cefb 100644 --- a/client/src/test/java/org/eclipse/hono/client/impl/DeviceConnectionClientImplTest.java +++ b/client/src/test/java/org/eclipse/hono/client/impl/DeviceConnectionClientImplTest.java @@ -162,7 +162,7 @@ public void testSetLastKnownGatewayForDeviceSuccess(final VertxTestContext ctx) public void testSetCommandHandlingAdapterInstance(final VertxTestContext ctx) { // WHEN setting the last known gateway - client.setCommandHandlingAdapterInstance("deviceId", "gatewayId", -1, span.context()) + client.setCommandHandlingAdapterInstance("deviceId", "gatewayId", null, span.context()) .setHandler(ctx.succeeding(r -> { ctx.verify(() -> { // THEN the response for setting the command handling adapter instance has been handled by the service @@ -290,7 +290,7 @@ public void testSetCommandHandlingAdapterInstanceFailsWithSendError(final VertxT when(sender.sendQueueFull()).thenReturn(true); // WHEN getting last known gateway information - client.setCommandHandlingAdapterInstance("deviceId", "adapterInstanceId", -1, span.context()) + client.setCommandHandlingAdapterInstance("deviceId", "adapterInstanceId", null, span.context()) .setHandler(ctx.failing(t -> { ctx.verify(() -> { // THEN the invocation fails and the span is marked as erroneous @@ -438,7 +438,7 @@ public void testSetCommandHandlingAdapterInstanceFailsWithRejectedRequest(final }); // WHEN getting last known gateway information - client.setCommandHandlingAdapterInstance("deviceId", "adapterInstanceId", -1, span.context()) + client.setCommandHandlingAdapterInstance("deviceId", "adapterInstanceId", null, span.context()) .setHandler(ctx.failing(t -> { assertThat(((ServiceInvocationException) t).getErrorCode()).isEqualTo(HttpURLConnection.HTTP_BAD_REQUEST); ctx.verify(() -> { @@ -561,7 +561,7 @@ public void testSetLastKnownGatewayForDeviceIncludesRequiredInformationInRequest public void testSetCommandHandlingAdapterInstanceIncludesRequiredInformationInRequest() { // WHEN getting last known gateway information - client.setCommandHandlingAdapterInstance("deviceId", "adapterInstanceId", -1, span.context()); + client.setCommandHandlingAdapterInstance("deviceId", "adapterInstanceId", null, span.context()); // THEN the message being sent contains the device ID in its properties final Message sentMessage = verifySenderSend(); diff --git a/client/src/test/java/org/eclipse/hono/client/impl/ProtocolAdapterCommandConsumerFactoryImplTest.java b/client/src/test/java/org/eclipse/hono/client/impl/ProtocolAdapterCommandConsumerFactoryImplTest.java index ba8feaaa4da..f1adb6787a5 100644 --- a/client/src/test/java/org/eclipse/hono/client/impl/ProtocolAdapterCommandConsumerFactoryImplTest.java +++ b/client/src/test/java/org/eclipse/hono/client/impl/ProtocolAdapterCommandConsumerFactoryImplTest.java @@ -22,9 +22,11 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.net.HttpURLConnection; +import java.time.Duration; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -33,6 +35,7 @@ import org.eclipse.hono.client.DeviceConnectionClient; import org.eclipse.hono.client.DeviceConnectionClientFactory; import org.eclipse.hono.client.HonoConnection; +import org.eclipse.hono.client.MessageConsumer; import org.eclipse.hono.client.ServerErrorException; import org.eclipse.hono.client.ServiceInvocationException; import org.eclipse.hono.config.ClientConfigProperties; @@ -132,7 +135,8 @@ public void setUp() { when(deviceConnectionClientFactory.connect()).thenReturn(Future.succeededFuture(mock(HonoConnection.class))); when(deviceConnectionClientFactory.getOrCreateDeviceConnectionClient(anyString())) .thenReturn(Future.succeededFuture(devConClient)); - when(devConClient.setCommandHandlingAdapterInstance(anyString(), anyString(), anyInt(), any())).thenReturn(Future.succeededFuture()); + when(devConClient.setCommandHandlingAdapterInstance(anyString(), anyString(), any(), any())).thenReturn(Future.succeededFuture()); + when(devConClient.removeCommandHandlingAdapterInstance(anyString(), anyString(), any())).thenReturn(Future.succeededFuture()); commandConsumerFactory = new ProtocolAdapterCommandConsumerFactoryImpl(connection); commandConsumerFactory.initialize(commandTargetMapper, deviceConnectionClientFactory); @@ -158,7 +162,7 @@ public void testCreateCommandConsumerFailsIfPeerRejectsLink(final VertxTestConte VertxMockSupport.anyHandler())) .thenReturn(Future.failedFuture(ex)); - commandConsumerFactory.createCommandConsumer(tenantId, deviceId, commandHandler, null) + commandConsumerFactory.createCommandConsumer(tenantId, deviceId, commandHandler, null, null) .setHandler(ctx.failing(t -> { ctx.verify(() -> assertThat(((ServiceInvocationException) t).getErrorCode()).isEqualTo(HttpURLConnection.HTTP_UNAVAILABLE)); ctx.completeNow(); @@ -166,8 +170,8 @@ public void testCreateCommandConsumerFailsIfPeerRejectsLink(final VertxTestConte } /** - * Verifies that the connection successfully opens a command consumer for a - * tenant and device Id and opens a receiver link that is scoped to the device. + * Verifies that creating a command consumer successfully creates a tenant-scoped + * receiver link and registers the command handling adapter instance. * * @param ctx The test context. */ @@ -176,8 +180,46 @@ public void testCreateCommandConsumerSucceeds(final VertxTestContext ctx) { final Handler commandHandler = VertxMockSupport.mockHandler(); - commandConsumerFactory.createCommandConsumer(tenantId, deviceId, commandHandler, null) - .setHandler(ctx.completing()); + commandConsumerFactory.createCommandConsumer(tenantId, deviceId, commandHandler, null, null) + .setHandler(ctx.succeeding(c -> { + ctx.verify(() -> { + verify(connection).createReceiver(eq(tenantCommandAddress), eq(ProtonQoS.AT_LEAST_ONCE), any(), anyInt(), + eq(false), any()); + verify(devConClient).setCommandHandlingAdapterInstance(eq(deviceId), anyString(), any(), any()); + }); + ctx.completeNow(); + })); + } + + /** + * Verifies that creating a command consumer with a positive lifespan and + * closing the consumer afterwards succeeds. + * + * @param ctx The test context. + */ + @Test + public void testCreateTimeLimitedCommandConsumerSucceeds(final VertxTestContext ctx) { + + final Handler commandHandler = VertxMockSupport.mockHandler(); + + final Duration lifespan = Duration.ofSeconds(10); + final Future commandConsumerFuture = commandConsumerFactory.createCommandConsumer(tenantId, + deviceId, commandHandler, lifespan, null); + commandConsumerFuture.setHandler(ctx.succeeding(consumer -> { + ctx.verify(() -> { + verify(connection).createReceiver(eq(tenantCommandAddress), eq(ProtonQoS.AT_LEAST_ONCE), any(), anyInt(), + eq(false), any()); + verify(devConClient).setCommandHandlingAdapterInstance(eq(deviceId), anyString(), any(), any()); + // verify closing the consumer is successful + consumer.close(ctx.succeeding(v -> { + ctx.verify(() -> { + // verify command handling adapter instance has been explicitly removed (since lifespan hasn't elapsed yet) + verify(devConClient).removeCommandHandlingAdapterInstance(eq(deviceId), anyString(), any()); + }); + ctx.completeNow(); + })); + }); + })); } } diff --git a/core/src/main/java/org/eclipse/hono/util/MessageHelper.java b/core/src/main/java/org/eclipse/hono/util/MessageHelper.java index 21fc3978b9a..6613d948944 100644 --- a/core/src/main/java/org/eclipse/hono/util/MessageHelper.java +++ b/core/src/main/java/org/eclipse/hono/util/MessageHelper.java @@ -77,7 +77,7 @@ public final class MessageHelper { */ public static final String APP_PROPERTY_ADAPTER_INSTANCE_ID = "adapter_instance_id"; /** - * The name of the AMQP 1.0 message application property containing a lifespan value. + * The name of the AMQP 1.0 message application property containing a lifespan value in seconds. */ public static final String APP_PROPERTY_LIFESPAN = "lifespan"; /** diff --git a/service-base/src/main/java/org/eclipse/hono/service/AbstractProtocolAdapterBase.java b/service-base/src/main/java/org/eclipse/hono/service/AbstractProtocolAdapterBase.java index 1c1482ac10f..055918dbb42 100644 --- a/service-base/src/main/java/org/eclipse/hono/service/AbstractProtocolAdapterBase.java +++ b/service-base/src/main/java/org/eclipse/hono/service/AbstractProtocolAdapterBase.java @@ -934,6 +934,7 @@ protected final Future createCommandConsumer( Tags.COMPONENT.set(commandContext.getCurrentSpan(), getTypeName()); commandConsumer.handle(commandContext); }, + null, context); } diff --git a/service-base/src/main/java/org/eclipse/hono/service/deviceconnection/DelegatingDeviceConnectionAmqpEndpoint.java b/service-base/src/main/java/org/eclipse/hono/service/deviceconnection/DelegatingDeviceConnectionAmqpEndpoint.java index 46395335272..487a451026a 100644 --- a/service-base/src/main/java/org/eclipse/hono/service/deviceconnection/DelegatingDeviceConnectionAmqpEndpoint.java +++ b/service-base/src/main/java/org/eclipse/hono/service/deviceconnection/DelegatingDeviceConnectionAmqpEndpoint.java @@ -13,6 +13,7 @@ package org.eclipse.hono.service.deviceconnection; import java.net.HttpURLConnection; +import java.time.Duration; import java.util.List; import java.util.Objects; @@ -239,7 +240,7 @@ protected Future processSetCmdHandlingAdapterInstance(final Message req final String tenantId = targetAddress.getTenantId(); final String deviceId = MessageHelper.getDeviceId(request); final String adapterInstanceId = MessageHelper.getApplicationProperty(request.getApplicationProperties(), MessageHelper.APP_PROPERTY_ADAPTER_INSTANCE_ID, String.class); - final Integer lifespanPropertyValue = MessageHelper.getApplicationProperty(request.getApplicationProperties(), MessageHelper.APP_PROPERTY_LIFESPAN, Integer.class); + final Integer lifespanSecondsOrNull = MessageHelper.getApplicationProperty(request.getApplicationProperties(), MessageHelper.APP_PROPERTY_LIFESPAN, Integer.class); final Span span = TracingHelper.buildServerChildSpan( tracer, @@ -253,15 +254,15 @@ protected Future processSetCmdHandlingAdapterInstance(final Message req TracingHelper.logError(span, "missing tenant, device and/or adapter instance id"); resultFuture = Future.failedFuture(new ClientErrorException(HttpURLConnection.HTTP_BAD_REQUEST)); } else { - final int lifespanSeconds = lifespanPropertyValue != null ? lifespanPropertyValue : -1; + final Duration lifespan = lifespanSecondsOrNull != null ? Duration.ofSeconds(lifespanSecondsOrNull) : Duration.ofSeconds(-1); TracingHelper.TAG_TENANT_ID.set(span, tenantId); TracingHelper.TAG_DEVICE_ID.set(span, deviceId); span.setTag(MessageHelper.APP_PROPERTY_ADAPTER_INSTANCE_ID, adapterInstanceId); - span.setTag(MessageHelper.APP_PROPERTY_LIFESPAN, lifespanSeconds); + span.setTag(MessageHelper.APP_PROPERTY_LIFESPAN, lifespan.getSeconds()); log.debug("setting command handling adapter instance for tenant [{}], device [{}] to {} (lifespan: {}s)", - tenantId, deviceId, adapterInstanceId, lifespanSeconds); + tenantId, deviceId, adapterInstanceId, lifespan.getSeconds()); - resultFuture = getService().setCommandHandlingAdapterInstance(tenantId, deviceId, adapterInstanceId, lifespanSeconds, span) + resultFuture = getService().setCommandHandlingAdapterInstance(tenantId, deviceId, adapterInstanceId, lifespan, span) .map(res -> DeviceConnectionConstants.getAmqpReply( DeviceConnectionConstants.DEVICE_CONNECTION_ENDPOINT, tenantId, diff --git a/service-base/src/main/java/org/eclipse/hono/service/deviceconnection/DeviceConnectionService.java b/service-base/src/main/java/org/eclipse/hono/service/deviceconnection/DeviceConnectionService.java index d2e673c080f..ce9e24b557e 100644 --- a/service-base/src/main/java/org/eclipse/hono/service/deviceconnection/DeviceConnectionService.java +++ b/service-base/src/main/java/org/eclipse/hono/service/deviceconnection/DeviceConnectionService.java @@ -13,6 +13,7 @@ package org.eclipse.hono.service.deviceconnection; +import java.time.Duration; import java.util.List; import org.eclipse.hono.util.DeviceConnectionResult; @@ -79,8 +80,9 @@ Future setLastKnownGatewayForDevice(String tenantId, Str * @param tenantId The tenant id. * @param deviceId The device id. * @param adapterInstanceId The protocol adapter instance id. - * @param lifespanSeconds The lifespan of the mapping entry in seconds. A negative value is interpreted as an - * unlimited lifespan. + * @param lifespan The lifespan of the mapping entry. Using a negative duration or {@code null} here is + * interpreted as an unlimited lifespan. The guaranteed granularity taken into account + * here is seconds. * @param span The active OpenTracing span for this operation. It is not to be closed in this method! An * implementation should log (error) events on this span and it may set tags and use this span as the * parent for any spans created in this method. @@ -89,7 +91,7 @@ Future setLastKnownGatewayForDevice(String tenantId, Str * @throws NullPointerException if any of the parameters is {@code null}. */ Future setCommandHandlingAdapterInstance(String tenantId, String deviceId, - String adapterInstanceId, int lifespanSeconds, Span span); + String adapterInstanceId, Duration lifespan, Span span); /** * Removes the mapping information that associates the given device with the given protocol adapter instance diff --git a/services/device-connection/src/main/java/org/eclipse/hono/deviceconnection/infinispan/CacheBasedDeviceConnectionService.java b/services/device-connection/src/main/java/org/eclipse/hono/deviceconnection/infinispan/CacheBasedDeviceConnectionService.java index cb03247ee47..3d3dd0e5d32 100644 --- a/services/device-connection/src/main/java/org/eclipse/hono/deviceconnection/infinispan/CacheBasedDeviceConnectionService.java +++ b/services/device-connection/src/main/java/org/eclipse/hono/deviceconnection/infinispan/CacheBasedDeviceConnectionService.java @@ -14,6 +14,7 @@ package org.eclipse.hono.deviceconnection.infinispan; import java.net.HttpURLConnection; +import java.time.Duration; import java.util.HashSet; import java.util.List; import java.util.Objects; @@ -78,9 +79,9 @@ public Future getLastKnownGatewayForDevice( @Override public Future setCommandHandlingAdapterInstance(final String tenantId, - final String deviceId, final String adapterInstanceId, final int lifespanSeconds, + final String deviceId, final String adapterInstanceId, final Duration lifespan, final Span span) { - return cache.setCommandHandlingAdapterInstance(tenantId, deviceId, adapterInstanceId, lifespanSeconds, span.context()) + return cache.setCommandHandlingAdapterInstance(tenantId, deviceId, adapterInstanceId, lifespan, span.context()) .map(v -> DeviceConnectionResult.from(HttpURLConnection.HTTP_NO_CONTENT)) .otherwise(t -> DeviceConnectionResult.from(ServiceInvocationException.extractStatusCode(t))); } diff --git a/services/device-connection/src/test/java/org/eclipse/hono/deviceconnection/infinispan/RemoteCacheBasedDeviceConnectionServiceTest.java b/services/device-connection/src/test/java/org/eclipse/hono/deviceconnection/infinispan/RemoteCacheBasedDeviceConnectionServiceTest.java index 0c39762e6c6..0ca7f0c3ed7 100644 --- a/services/device-connection/src/test/java/org/eclipse/hono/deviceconnection/infinispan/RemoteCacheBasedDeviceConnectionServiceTest.java +++ b/services/device-connection/src/test/java/org/eclipse/hono/deviceconnection/infinispan/RemoteCacheBasedDeviceConnectionServiceTest.java @@ -15,7 +15,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; @@ -156,15 +155,15 @@ public void testSetCommandHandlingAdapterInstance(final VertxTestContext ctx) { final String deviceId = "testDevice"; final String adapterInstanceId = "adapterInstanceId"; - when(cache.setCommandHandlingAdapterInstance(anyString(), anyString(), anyString(), anyInt(), any(SpanContext.class))) + when(cache.setCommandHandlingAdapterInstance(anyString(), anyString(), anyString(), any(), any(SpanContext.class))) .thenReturn(Future.succeededFuture()); givenAStartedService() - .compose(ok -> svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstanceId, -1, span)) + .compose(ok -> svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstanceId, null, span)) .setHandler(ctx.succeeding(result -> { ctx.verify(() -> { assertThat(result.getStatus()).isEqualTo(HttpURLConnection.HTTP_NO_CONTENT); - verify(cache).setCommandHandlingAdapterInstance(eq(Constants.DEFAULT_TENANT), eq(deviceId), eq(adapterInstanceId), anyInt(), any(SpanContext.class)); + verify(cache).setCommandHandlingAdapterInstance(eq(Constants.DEFAULT_TENANT), eq(deviceId), eq(adapterInstanceId), any(), any(SpanContext.class)); }); ctx.completeNow(); })); diff --git a/services/device-registry-base/src/main/java/org/eclipse/hono/deviceregistry/service/deviceconnection/MapBasedDeviceConnectionService.java b/services/device-registry-base/src/main/java/org/eclipse/hono/deviceregistry/service/deviceconnection/MapBasedDeviceConnectionService.java index 05ea8dedbe3..ecc2218a767 100644 --- a/services/device-registry-base/src/main/java/org/eclipse/hono/deviceregistry/service/deviceconnection/MapBasedDeviceConnectionService.java +++ b/services/device-registry-base/src/main/java/org/eclipse/hono/deviceregistry/service/deviceconnection/MapBasedDeviceConnectionService.java @@ -14,6 +14,7 @@ package org.eclipse.hono.deviceregistry.service.deviceconnection; import java.net.HttpURLConnection; +import java.time.Duration; import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; @@ -24,7 +25,6 @@ import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; import org.eclipse.hono.service.deviceconnection.DeviceConnectionService; import org.eclipse.hono.util.DeviceConnectionConstants; @@ -117,7 +117,7 @@ public final Future getLastKnownGatewayForDevice(final S @Override public final Future setCommandHandlingAdapterInstance(final String tenantId, - final String deviceId, final String protocolAdapterInstanceId, final int lifespanSeconds, final Span span) { + final String deviceId, final String protocolAdapterInstanceId, final Duration lifespan, final Span span) { Objects.requireNonNull(tenantId); Objects.requireNonNull(deviceId); Objects.requireNonNull(protocolAdapterInstanceId); @@ -129,7 +129,7 @@ public final Future setCommandHandlingAdapterInstance(fi if (currentMapSize < getConfig().getMaxDevicesPerTenant() || (currentMapSize == getConfig().getMaxDevicesPerTenant() && adapterInstancesForTenantMap.containsKey(deviceId))) { adapterInstancesForTenantMap.put(deviceId, - createExpiringValue(createAdapterInstanceIdJson(protocolAdapterInstanceId), lifespanSeconds)); + new ExpiringValue<>(createAdapterInstanceIdJson(protocolAdapterInstanceId), getLifespanNanos(lifespan))); result = DeviceConnectionResult.from(HttpURLConnection.HTTP_NO_CONTENT); } else { log.debug("cannot set protocol adapter instance for handling commands of device [{}], tenant [{}]: max number of entries per tenant reached ({})", @@ -139,6 +139,14 @@ public final Future setCommandHandlingAdapterInstance(fi return Future.succeededFuture(result); } + private long getLifespanNanos(final Duration lifespan) { + // sanity check, preventing an ArithmeticException in lifespan.toNanos() + if (lifespan == null || lifespan.isNegative() || lifespan.getSeconds() > (Long.MAX_VALUE / 1000_000_000L)) { + return Long.MAX_VALUE; // "unlimited" lifespan + } + return lifespan.toNanos(); + } + private ConcurrentMap> buildAdapterInstancesForTenantMap() { return Caffeine.newBuilder() .expireAfter(new Expiry>() { @@ -163,12 +171,6 @@ public long expireAfterRead(final String key, final ExpiringValue va .build().asMap(); } - // package-private to allow override in unit tests - ExpiringValue createExpiringValue(final V adapterInstanceIdJson, final int lifespanSeconds) { - final long lifespanNanos = lifespanSeconds < 0 ? Long.MAX_VALUE : TimeUnit.SECONDS.toNanos(lifespanSeconds); - return new ExpiringValue<>(adapterInstanceIdJson, lifespanNanos); - } - @Override public final Future removeCommandHandlingAdapterInstance(final String tenantId, final String deviceId, final String adapterInstanceId, final Span span) { diff --git a/services/device-registry-base/src/test/java/org/eclipse/hono/deviceregistry/service/deviceconnection/MapBasedDeviceConnectionServiceTest.java b/services/device-registry-base/src/test/java/org/eclipse/hono/deviceregistry/service/deviceconnection/MapBasedDeviceConnectionServiceTest.java index 1292309db6f..b2decbeeec3 100644 --- a/services/device-registry-base/src/test/java/org/eclipse/hono/deviceregistry/service/deviceconnection/MapBasedDeviceConnectionServiceTest.java +++ b/services/device-registry-base/src/test/java/org/eclipse/hono/deviceregistry/service/deviceconnection/MapBasedDeviceConnectionServiceTest.java @@ -21,9 +21,9 @@ import static org.mockito.Mockito.when; import java.net.HttpURLConnection; +import java.time.Duration; import java.util.Collections; import java.util.List; -import java.util.concurrent.TimeUnit; import org.eclipse.hono.util.Constants; import org.eclipse.hono.util.DeviceConnectionConstants; @@ -63,14 +63,7 @@ public void setUp() { final Vertx vertx = mock(Vertx.class); when(vertx.eventBus()).thenReturn(eventBus); - svc = new MapBasedDeviceConnectionService() { - @Override - ExpiringValue createExpiringValue(final V adapterInstanceIdJson, final int lifespanSeconds) { - // adapt lifespan to 1ms (if not set to a negative value) so that we don't have to wait seconds for expiration tests - final long lifespanNanos = lifespanSeconds < 0 ? Long.MAX_VALUE : TimeUnit.MILLISECONDS.toNanos(1); - return new ExpiringValue<>(adapterInstanceIdJson, lifespanNanos); - } - }; + svc = new MapBasedDeviceConnectionService(); props = new MapBasedDeviceConnectionsConfigProperties(); svc.setConfig(props); } @@ -176,7 +169,7 @@ public void testSetLastKnownGatewayForDeviceFailsIfLimitReached(final VertxTestC public void testSetCommandHandlingAdapterInstanceSucceeds(final VertxTestContext ctx) { final String deviceId = "testDevice"; final String adapterInstance = "adapterInstance"; - svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, -1, span) + svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, span) .setHandler(ctx.succeeding(result -> ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, result.getStatus()); ctx.completeNow(); @@ -194,14 +187,14 @@ public void testSetCommandHandlingAdapterInstanceFailsIfLimitReached(final Vertx props.setMaxDevicesPerTenant(1); final String deviceId = "testDevice"; final String adapterInstance = "adapterInstance"; - svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, -1, span) + svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, span) .compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); }); // set another entry return svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, "testDevice2", - adapterInstance, -1, span); + adapterInstance, null, span); }).setHandler(ctx.succeeding(deviceConnectionResult -> ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_FORBIDDEN, deviceConnectionResult.getStatus()); assertNull(deviceConnectionResult.getPayload()); @@ -219,7 +212,7 @@ public void testSetCommandHandlingAdapterInstanceFailsIfLimitReached(final Vertx public void testRemoveCommandHandlingAdapterInstanceSucceeds(final VertxTestContext ctx) { final String deviceId = "testDevice"; final String adapterInstance = "adapterInstance"; - svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, -1, span) + svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, span) .compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); @@ -243,7 +236,7 @@ public void testRemoveCommandHandlingAdapterInstanceSucceeds(final VertxTestCont public void testRemoveCommandHandlingAdapterInstanceFailsForOtherDevice(final VertxTestContext ctx) { final String deviceId = "testDevice"; final String adapterInstance = "adapterInstance"; - svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, -1, span) + svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, span) .compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); @@ -267,7 +260,7 @@ public void testRemoveCommandHandlingAdapterInstanceFailsForOtherDevice(final Ve public void testRemoveCommandHandlingAdapterInstanceFailsForOtherAdapterInstance(final VertxTestContext ctx) { final String deviceId = "testDevice"; final String adapterInstance = "adapterInstance"; - svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, -1, span) + svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, span) .compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); @@ -293,14 +286,14 @@ public void testRemoveCommandHandlingAdapterInstanceFailsForOtherAdapterInstance public void testRemoveCommandHandlingAdapterInstanceFailsForExpiredEntry(final Vertx vertx, final VertxTestContext ctx) { final String deviceId = "testDevice"; final String adapterInstance = "adapterInstance"; - final int lifespanSeconds = 1; - svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, lifespanSeconds, span) + final Duration lifespan = Duration.ofMillis(1); + svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, lifespan, span) .compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); }); final Promise instancesPromise = Promise.promise(); - // wait 2ms so that the (adapted - see createExpiringValue() override) lifespan has elapsed + // wait 2ms so that the lifespan has elapsed vertx.setTimer(2, tid -> { svc.removeCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, span).setHandler(instancesPromise.future()); @@ -323,7 +316,7 @@ public void testRemoveCommandHandlingAdapterInstanceFailsForExpiredEntry(final V public void testGetCommandHandlingAdapterInstancesForDevice(final VertxTestContext ctx) { final String deviceId = "testDevice"; final String adapterInstance = "adapterInstance"; - svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, -1, span) + svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, span) .compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); @@ -350,14 +343,14 @@ public void testGetCommandHandlingAdapterInstancesForDevice(final VertxTestConte public void testGetCommandHandlingAdapterInstancesForExpiredEntry(final Vertx vertx, final VertxTestContext ctx) { final String deviceId = "testDevice"; final String adapterInstance = "adapterInstance"; - final int lifespanSeconds = 1; - svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, lifespanSeconds, span) + final Duration lifespan = Duration.ofMillis(1); + svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, lifespan, span) .compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); }); final Promise instancesPromise = Promise.promise(); - // wait 2ms so that the (adapted - see createExpiringValue() override) lifespan has elapsed + // wait 2ms so that the lifespan has elapsed vertx.setTimer(2, tid -> { svc.getCommandHandlingAdapterInstances(Constants.DEFAULT_TENANT, deviceId, Collections.emptyList(), span) @@ -402,14 +395,14 @@ public void testGetCommandHandlingAdapterInstancesForLastKnownGateway(final Vert final String otherGatewayId = "gw-2"; final List viaGateways = List.of(gatewayId, otherGatewayId); // set command handling adapter instance for gateway - svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, -1, span) + svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, null, span) .compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); }); // set command handling adapter instance for other gateway return svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, otherGatewayId, - otherAdapterInstance, -1, span); + otherAdapterInstance, null, span); }).compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); @@ -450,14 +443,14 @@ public void testGetCommandHandlingAdapterInstancesWithMultiResultAndLastKnownGat final String gatewayIdNotInVia = "gw-old"; final List viaGateways = List.of(gatewayId, otherGatewayId); // set command handling adapter instance for gateway - svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, -1, span) + svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, null, span) .compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); }); // set command handling adapter instance for other gateway return svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, otherGatewayId, - otherAdapterInstance, -1, span); + otherAdapterInstance, null, span); }).compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); @@ -499,14 +492,14 @@ public void testGetCommandHandlingAdapterInstancesWithMultiResultAndNoAdapterFor final String gatewayWithNoAdapterInstance = "gw-other"; final List viaGateways = List.of(gatewayId, otherGatewayId, gatewayWithNoAdapterInstance); // set command handling adapter instance for gateway - svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, -1, span) + svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, null, span) .compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); }); // set command handling adapter instance for other gateway return svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, otherGatewayId, - otherAdapterInstance, -1, span); + otherAdapterInstance, null, span); }).compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); @@ -544,7 +537,7 @@ public void testGetCommandHandlingAdapterInstancesForLastKnownGatewayNotInVia(fi final String gatewayId = "gw-1"; final List viaGateways = Collections.singletonList("otherGatewayId"); // set command handling adapter instance for gateway - svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, -1, span) + svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, null, span) .compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); @@ -578,14 +571,14 @@ public void testGetCommandHandlingAdapterInstancesWithLastKnownGatewayIsGivingDe final String gatewayId = "gw-1"; final List viaGateways = List.of(gatewayId); // set command handling adapter instance for device - svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, -1, span) + svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, span) .compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); }); // set command handling adapter instance for other gateway return svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, - otherAdapterInstance, -1, span); + otherAdapterInstance, null, span); }).compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); @@ -623,14 +616,14 @@ public void testGetCommandHandlingAdapterInstancesWithoutLastKnownGatewayIsGivin final String gatewayId = "gw-1"; final List viaGateways = List.of(gatewayId); // set command handling adapter instance for device - svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, -1, span) + svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, span) .compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); }); // set command handling adapter instance for other gateway return svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, - otherAdapterInstance, -1, span); + otherAdapterInstance, null, span); }).compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); @@ -661,7 +654,7 @@ public void testGetCommandHandlingAdapterInstancesForOneSubscribedVia(final Vert final String otherGatewayId = "gw-2"; final List viaGateways = List.of(gatewayId, otherGatewayId); // set command handling adapter instance for gateway - svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, -1, span) + svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, null, span) .compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); @@ -692,14 +685,14 @@ public void testGetCommandHandlingAdapterInstancesForMultipleSubscribedVias(fina final String otherGatewayId = "gw-2"; final List viaGateways = List.of(gatewayId, otherGatewayId); // set command handling adapter instance for gateway - svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, -1, span) + svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, null, span) .compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); }); // set command handling adapter instance for other gateway return svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, otherGatewayId, - otherAdapterInstance, -1, span); + otherAdapterInstance, null, span); }).compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); @@ -748,7 +741,7 @@ public void testGetCommandHandlingAdapterInstancesFailsForOtherTenantDevice(fina final String otherGatewayId = "gw-2"; final List viaGateways = List.of(gatewayId); // set command handling adapter instance for other gateway - svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, otherGatewayId, adapterInstance, -1, span) + svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, otherGatewayId, adapterInstance, null, span) .compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); diff --git a/tests/src/test/java/org/eclipse/hono/tests/jms/JmsBasedDeviceConnectionClient.java b/tests/src/test/java/org/eclipse/hono/tests/jms/JmsBasedDeviceConnectionClient.java index fc5df948797..fecb52c44be 100644 --- a/tests/src/test/java/org/eclipse/hono/tests/jms/JmsBasedDeviceConnectionClient.java +++ b/tests/src/test/java/org/eclipse/hono/tests/jms/JmsBasedDeviceConnectionClient.java @@ -15,6 +15,7 @@ package org.eclipse.hono.tests.jms; import java.net.HttpURLConnection; +import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Optional; @@ -113,9 +114,10 @@ public Future setLastKnownGatewayForDevice( public Future setCommandHandlingAdapterInstance( final String deviceId, final String adapterInstanceId, - final int lifespanSeconds, + final Duration lifespan, final SpanContext context) { + final long lifespanSeconds = lifespan != null && lifespan.getSeconds() <= Integer.MAX_VALUE ? lifespan.getSeconds() : -1; return sendRequest( DeviceConnectionAction.SET_CMD_HANDLING_ADAPTER_INSTANCE.getSubject(), Map.of(MessageHelper.APP_PROPERTY_DEVICE_ID, deviceId, diff --git a/tests/src/test/java/org/eclipse/hono/tests/registry/DeviceConnectionApiTests.java b/tests/src/test/java/org/eclipse/hono/tests/registry/DeviceConnectionApiTests.java index de59045145b..9e32c5263d0 100644 --- a/tests/src/test/java/org/eclipse/hono/tests/registry/DeviceConnectionApiTests.java +++ b/tests/src/test/java/org/eclipse/hono/tests/registry/DeviceConnectionApiTests.java @@ -17,6 +17,7 @@ import static org.assertj.core.api.Assertions.assertThat; import java.net.HttpURLConnection; +import java.time.Duration; import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -117,7 +118,7 @@ public void testSetCommandHandlingAdapterInstanceSucceeds(final VertxTestContext final String adapterInstance = randomId(); getClient(Constants.DEFAULT_TENANT) - .compose(client -> client.setCommandHandlingAdapterInstance(deviceId, adapterInstance, -1, null).map(client)) + .compose(client -> client.setCommandHandlingAdapterInstance(deviceId, adapterInstance, null, null).map(client)) .compose(client -> client.getCommandHandlingAdapterInstances(deviceId, List.of(), null)) .setHandler(ctx.succeeding(r -> { ctx.verify(() -> { @@ -144,11 +145,11 @@ public void testGetCommandHandlingAdapterInstancesFailsForExpiredEntry(final Ver final String deviceId = randomId(); final String adapterInstance = randomId(); - final int lifespanSeconds = 1; + final Duration lifespan = Duration.ofSeconds(1); getClient(Constants.DEFAULT_TENANT) .compose(client -> { - return client.setCommandHandlingAdapterInstance(deviceId, adapterInstance, lifespanSeconds, null).map(client); + return client.setCommandHandlingAdapterInstance(deviceId, adapterInstance, lifespan, null).map(client); }) .compose(client -> { final Promise instancesPromise = Promise.promise();