diff --git a/client-device-connection-infinispan/src/main/java/org/eclipse/hono/deviceconnection/infinispan/client/BasicCache.java b/client-device-connection-infinispan/src/main/java/org/eclipse/hono/deviceconnection/infinispan/client/BasicCache.java index 4d620c01c5..7ed40efece 100644 --- a/client-device-connection-infinispan/src/main/java/org/eclipse/hono/deviceconnection/infinispan/client/BasicCache.java +++ b/client-device-connection-infinispan/src/main/java/org/eclipse/hono/deviceconnection/infinispan/client/BasicCache.java @@ -200,9 +200,12 @@ protected Future withCache(final Function put(final K key, final V value) { + Objects.requireNonNull(key); + Objects.requireNonNull(value); return withCache(cache -> cache.putAsync(key, value)); @@ -218,14 +221,42 @@ public Future put(final K key, final V value) { * @return A succeeded future containing the previous value or {@code null} if the * cache didn't contain the key yet. * A failed future if the value could not be stored in the cache. + * @throws NullPointerException if any of the parameters is {@code null}. */ @Override public Future put(final K key, final V value, final long lifespan, final TimeUnit lifespanUnit) { + Objects.requireNonNull(key); + Objects.requireNonNull(value); + Objects.requireNonNull(lifespanUnit); return withCache(cache -> cache.putAsync(key, value, lifespan, lifespanUnit)); } + /** + * Replaces the entry for a key only if currently mapped to a given value. + * + * @param key The key. + * @param oldValue The value to overwrite. + * @param newValue The value to store. + * @param lifespan The lifespan of the entry. A negative value is interpreted as an unlimited lifespan. + * @param lifespanUnit The time unit for the lifespan. + * @return A succeeded future containing a boolean, indicating whether the value was replaced or not. + * A failed future if the value could not be stored in the cache. + * @throws NullPointerException if any of the parameters is {@code null}. + */ + @Override + public Future replace(final K key, final V oldValue, final V newValue, final long lifespan, + final TimeUnit lifespanUnit) { + Objects.requireNonNull(key); + Objects.requireNonNull(oldValue); + Objects.requireNonNull(newValue); + Objects.requireNonNull(lifespanUnit); + + return withCache(cache -> cache.replaceAsync(key, oldValue, newValue, lifespan, lifespanUnit)); + + } + /** * Remove a key/value mapping from the cache. * @@ -233,9 +264,12 @@ public Future put(final K key, final V value, final long lifespan, final Time * @param value The value. * @return {@code true} if the key was mapped to the value, {@code false} * otherwise. + * @throws NullPointerException if any of the parameters is {@code null}. */ @Override public Future remove(final K key, final V value) { + Objects.requireNonNull(key); + Objects.requireNonNull(value); return withCache(cache -> cache.removeAsync(key, value)); @@ -248,9 +282,11 @@ public Future remove(final K key, final V value) { * @return A succeeded future containing the value or {@code null} if the * cache didn't contain the key yet. * A failed future if the value could not be read from the cache. + * @throws NullPointerException if key is {@code null}. */ @Override public Future get(final K key) { + Objects.requireNonNull(key); return withCache(cache -> cache.getAsync(key)); @@ -261,9 +297,11 @@ public Future get(final K key) { * * @param keys The keys. * @return A succeeded future containing a map with key/value pairs. + * @throws NullPointerException if keys is {@code null}. */ @Override public Future> getAll(final Set keys) { + Objects.requireNonNull(keys); return withCache(cache -> cache.getAllAsync(keys)); diff --git a/client-device-connection-infinispan/src/main/java/org/eclipse/hono/deviceconnection/infinispan/client/Cache.java b/client-device-connection-infinispan/src/main/java/org/eclipse/hono/deviceconnection/infinispan/client/Cache.java index ba53635f31..10638263f6 100644 --- a/client-device-connection-infinispan/src/main/java/org/eclipse/hono/deviceconnection/infinispan/client/Cache.java +++ b/client-device-connection-infinispan/src/main/java/org/eclipse/hono/deviceconnection/infinispan/client/Cache.java @@ -45,6 +45,7 @@ public interface Cache { * @return A succeeded future containing the previous value or {@code null} if the * cache didn't contain the key yet. * A failed future if the value could not be stored in the cache. + * @throws NullPointerException if any of the parameters is {@code null}. */ Future put(K key, V value); @@ -58,9 +59,24 @@ public interface Cache { * @return A succeeded future containing the previous value or {@code null} if the * cache didn't contain the key yet. * A failed future if the value could not be stored in the cache. + * @throws NullPointerException if any of the parameters is {@code null}. */ Future put(K key, V value, long lifespan, TimeUnit lifespanUnit); + /** + * Replaces the entry for a key only if currently mapped to a given value. + * + * @param key The key. + * @param oldValue The value to overwrite. + * @param newValue The value to store. + * @param lifespan The lifespan of the entry. A negative value is interpreted as an unlimited lifespan. + * @param lifespanUnit The time unit for the lifespan. + * @return A succeeded future containing a boolean, indicating whether the value was replaced or not. + * A failed future if the value could not be stored in the cache. + * @throws NullPointerException if any of the parameters is {@code null}. + */ + Future replace(K key, V oldValue, V newValue, long lifespan, TimeUnit lifespanUnit); + /** * Gets a value from the cache. * @@ -68,6 +84,7 @@ public interface Cache { * @return A succeeded future containing the value or {@code null} if the * cache didn't contain the key yet. * A failed future if the value could not be read from the cache. + * @throws NullPointerException if key is {@code null}. */ Future get(K key); @@ -79,6 +96,7 @@ public interface Cache { * @return A succeeded future containing {@code true} if the key was * mapped to the value, {@code false} otherwise. * A failed future if the value could not be remove from the cache. + * @throws NullPointerException if any of the parameters is {@code null}. */ Future remove(K key, V value); @@ -87,6 +105,7 @@ public interface Cache { * * @param keys The keys. * @return A succeeded future containing a map with key/value pairs. + * @throws NullPointerException if keys is {@code null}. */ Future> getAll(Set keys); } diff --git a/client-device-connection-infinispan/src/main/java/org/eclipse/hono/deviceconnection/infinispan/client/CacheBasedDeviceConnectionClient.java b/client-device-connection-infinispan/src/main/java/org/eclipse/hono/deviceconnection/infinispan/client/CacheBasedDeviceConnectionClient.java index 386c421630..b8c528b2ba 100644 --- a/client-device-connection-infinispan/src/main/java/org/eclipse/hono/deviceconnection/infinispan/client/CacheBasedDeviceConnectionClient.java +++ b/client-device-connection-infinispan/src/main/java/org/eclipse/hono/deviceconnection/infinispan/client/CacheBasedDeviceConnectionClient.java @@ -117,8 +117,8 @@ public Future getLastKnownGatewayForDevice(final String deviceId, fi @Override public Future setCommandHandlingAdapterInstance(final String deviceId, final String adapterInstanceId, - final Duration lifespan, final SpanContext context) { - return cache.setCommandHandlingAdapterInstance(tenantId, deviceId, adapterInstanceId, lifespan, context); + final Duration lifespan, final boolean updateOnly, final SpanContext context) { + return cache.setCommandHandlingAdapterInstance(tenantId, deviceId, adapterInstanceId, lifespan, updateOnly, context); } @Override diff --git a/client-device-connection-infinispan/src/main/java/org/eclipse/hono/deviceconnection/infinispan/client/CacheBasedDeviceConnectionInfo.java b/client-device-connection-infinispan/src/main/java/org/eclipse/hono/deviceconnection/infinispan/client/CacheBasedDeviceConnectionInfo.java index 9d7d3ae951..8216fc6925 100644 --- a/client-device-connection-infinispan/src/main/java/org/eclipse/hono/deviceconnection/infinispan/client/CacheBasedDeviceConnectionInfo.java +++ b/client-device-connection-infinispan/src/main/java/org/eclipse/hono/deviceconnection/infinispan/client/CacheBasedDeviceConnectionInfo.java @@ -140,7 +140,7 @@ public Future getLastKnownGatewayForDevice(final String tenantId, fi @Override public Future setCommandHandlingAdapterInstance(final String tenantId, final String deviceId, - final String adapterInstanceId, final Duration lifespan, final SpanContext context) { + final String adapterInstanceId, final Duration lifespan, final boolean updateOnly, final SpanContext context) { Objects.requireNonNull(tenantId); Objects.requireNonNull(deviceId); Objects.requireNonNull(adapterInstanceId); @@ -148,17 +148,38 @@ public Future setCommandHandlingAdapterInstance(final String tenantId, fin // sanity check, preventing an ArithmeticException in lifespan.toMillis() final long lifespanMillis = lifespan == null || lifespan.isNegative() || lifespan.getSeconds() > (Long.MAX_VALUE / 1000L) ? -1 : lifespan.toMillis(); - return cache.put(getAdapterInstanceEntryKey(tenantId, deviceId), adapterInstanceId, lifespanMillis, TimeUnit.MILLISECONDS) - .map(replacedValue -> { - LOG.debug("set command handling adapter instance [tenant: {}, device-id: {}, adapter-instance: {}, lifespan: {}ms]", - tenantId, deviceId, adapterInstanceId, lifespanMillis); - return (Void) null; - }) - .recover(t -> { - LOG.debug("failed to set command handling adapter instance [tenant: {}, device-id: {}, adapter-instance: {}, lifespan: {}ms]", - tenantId, deviceId, adapterInstanceId, lifespanMillis, t); - return Future.failedFuture(new ServerErrorException(HttpURLConnection.HTTP_INTERNAL_ERROR, t)); - }); + final String key = getAdapterInstanceEntryKey(tenantId, deviceId); + if (updateOnly) { + return cache.replace(key, adapterInstanceId, adapterInstanceId, lifespanMillis, TimeUnit.MILLISECONDS) + .recover(t -> { + LOG.debug("failed to update command handling adapter instance [tenant: {}, device-id: {}, adapter-instance: {}, lifespan: {}ms]", + tenantId, deviceId, adapterInstanceId, lifespanMillis, t); + return Future.failedFuture(new ServerErrorException(HttpURLConnection.HTTP_INTERNAL_ERROR, t)); + }) + .compose(replaced -> { + if (replaced) { + LOG.debug("updated command handling adapter instance [tenant: {}, device-id: {}, adapter-instance: {}, lifespan: {}ms]", + tenantId, deviceId, adapterInstanceId, lifespanMillis); + return Future.succeededFuture(); + } else { + LOG.debug("cannot update non-existing command handling adapter instance entry [tenant: {}, device-id: {}, adapter-instance: {}, lifespan: {}ms]", + tenantId, deviceId, adapterInstanceId, lifespanMillis); + return Future.failedFuture(new ClientErrorException(HttpURLConnection.HTTP_PRECON_FAILED)); + } + }); + } else { + return cache.put(key, adapterInstanceId, lifespanMillis, TimeUnit.MILLISECONDS) + .recover(t -> { + LOG.debug("failed to set command handling adapter instance [tenant: {}, device-id: {}, adapter-instance: {}, lifespan: {}ms]", + tenantId, deviceId, adapterInstanceId, lifespanMillis, t); + return Future.failedFuture(new ServerErrorException(HttpURLConnection.HTTP_INTERNAL_ERROR, t)); + }) + .map(replacedValue -> { + LOG.debug("set command handling adapter instance [tenant: {}, device-id: {}, adapter-instance: {}, lifespan: {}ms]", + tenantId, deviceId, adapterInstanceId, lifespanMillis); + return (Void) null; + }); + } } @Override diff --git a/client-device-connection-infinispan/src/main/java/org/eclipse/hono/deviceconnection/infinispan/client/DeviceConnectionInfo.java b/client-device-connection-infinispan/src/main/java/org/eclipse/hono/deviceconnection/infinispan/client/DeviceConnectionInfo.java index 59a29ed4ed..d0581dd672 100644 --- a/client-device-connection-infinispan/src/main/java/org/eclipse/hono/deviceconnection/infinispan/client/DeviceConnectionInfo.java +++ b/client-device-connection-infinispan/src/main/java/org/eclipse/hono/deviceconnection/infinispan/client/DeviceConnectionInfo.java @@ -76,6 +76,8 @@ public interface DeviceConnectionInfo { * @param adapterInstanceId The protocol adapter instance id. * @param lifespan The lifespan of the mapping entry. Using a negative duration or {@code null} here is * interpreted as an unlimited lifespan. + * @param updateOnly If {@code true}, this operation will only update an existing mapping entry with the given + * lifespan, provided such an entry with the given device id and adapter instance id exists. * @param context The currently active OpenTracing span context or {@code null} if no span is currently active. * Implementing classes should use this as the parent for any span they create for tracing * the execution of this operation. @@ -86,7 +88,7 @@ public interface DeviceConnectionInfo { * @throws NullPointerException if any of the parameters except context is {@code null}. */ Future setCommandHandlingAdapterInstance(String tenantId, String deviceId, String adapterInstanceId, - Duration lifespan, SpanContext context); + Duration lifespan, boolean updateOnly, SpanContext context); /** * Removes the mapping information that associates the given device with the given protocol adapter instance diff --git a/client-device-connection-infinispan/src/test/java/org/eclipse/hono/deviceconnection/infinispan/client/CacheBasedDeviceConnectionInfoTest.java b/client-device-connection-infinispan/src/test/java/org/eclipse/hono/deviceconnection/infinispan/client/CacheBasedDeviceConnectionInfoTest.java index 397a31e622..23b880908d 100644 --- a/client-device-connection-infinispan/src/test/java/org/eclipse/hono/deviceconnection/infinispan/client/CacheBasedDeviceConnectionInfoTest.java +++ b/client-device-connection-infinispan/src/test/java/org/eclipse/hono/deviceconnection/infinispan/client/CacheBasedDeviceConnectionInfoTest.java @@ -197,7 +197,8 @@ void testGetLastKnownGatewayFails(final VertxTestContext ctx) { public void testSetCommandHandlingAdapterInstanceSucceeds(final VertxTestContext ctx) { final String deviceId = "testDevice"; final String adapterInstance = "adapterInstance"; - info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, spanContext) + info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, false, + spanContext) .setHandler(ctx.succeeding(result -> ctx.completeNow())); } @@ -211,10 +212,54 @@ public void testSetCommandHandlingAdapterInstanceSucceeds(final VertxTestContext public void testSetCommandHandlingAdapterInstanceWithLifespanSucceeds(final VertxTestContext ctx) { final String deviceId = "testDevice"; final String adapterInstance = "adapterInstance"; - info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, Duration.ofSeconds(10), spanContext) + info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, Duration.ofSeconds(10), + false, spanContext) .setHandler(ctx.succeeding(result -> ctx.completeNow())); } + /** + * Verifies that the setCommandHandlingAdapterInstance operation with updateOnly set to + * {@code true} succeeds. + * + * @param ctx The vert.x context. + */ + @Test + public void testSetCommandHandlingAdapterInstanceWithUpdateOnlyTrueSucceeds(final VertxTestContext ctx) { + final String deviceId = "testDevice"; + final String adapterInstance = "adapterInstance"; + // first invocation initially adds the entry + info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, false, + spanContext) + // now update the entry + .compose(v -> info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, + true, spanContext)) + .setHandler(ctx.succeeding(result -> ctx.completeNow())); + } + + /** + * Verifies that the setCommandHandlingAdapterInstance operation with updateOnly set to + * {@code true} fails with a PRECON_FAILED status if the given adapter instance parameter doesn't match the one of + * the entry registered for the given device. + * + * @param ctx The vert.x context. + */ + @Test + public void testSetCommandHandlingAdapterInstanceWithUpdateOnlyTrueFails(final VertxTestContext ctx) { + final String deviceId = "testDevice"; + final String adapterInstance = "adapterInstance"; + // first invocation initially adds the entry + info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, false, + spanContext) + // now try to update the entry, but with another adapter instance + .compose(v -> info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, "otherAdapterInstance", + null, true, spanContext)) + .setHandler(ctx.failing(t -> ctx.verify(() -> { + assertThat(t).isInstanceOf(ServiceInvocationException.class); + assertThat(((ServiceInvocationException) t).getErrorCode()).isEqualTo(HttpURLConnection.HTTP_PRECON_FAILED); + ctx.completeNow(); + }))); + } + /** * Verifies that the removeCommandHandlingAdapterInstance operation succeeds if there was an entry to be deleted. * @@ -224,7 +269,8 @@ public void testSetCommandHandlingAdapterInstanceWithLifespanSucceeds(final Vert public void testRemoveCommandHandlingAdapterInstanceSucceeds(final VertxTestContext ctx) { final String deviceId = "testDevice"; final String adapterInstance = "adapterInstance"; - info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, spanContext) + info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, false, + spanContext) .compose(v -> info.removeCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, spanContext)) .setHandler(ctx.succeeding(result -> ctx.verify(() -> { @@ -244,7 +290,8 @@ public void testRemoveCommandHandlingAdapterInstanceSucceeds(final VertxTestCont public void testRemoveCommandHandlingAdapterInstanceFailsForOtherDevice(final VertxTestContext ctx) { final String deviceId = "testDevice"; final String adapterInstance = "adapterInstance"; - info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, spanContext) + info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, false, + spanContext) .compose(v -> { return info.removeCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, "otherDevice", adapterInstance, spanContext); }).setHandler(ctx.succeeding(result -> ctx.verify(() -> { @@ -263,7 +310,8 @@ public void testRemoveCommandHandlingAdapterInstanceFailsForOtherDevice(final Ve public void testRemoveCommandHandlingAdapterInstanceFailsForOtherAdapterInstance(final VertxTestContext ctx) { final String deviceId = "testDevice"; final String adapterInstance = "adapterInstance"; - info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, spanContext) + info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, false, + spanContext) .compose(v -> { return info.removeCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, "otherAdapterInstance", spanContext); }).setHandler(ctx.succeeding(result -> ctx.verify(() -> { @@ -282,7 +330,8 @@ public void testRemoveCommandHandlingAdapterInstanceFailsForOtherAdapterInstance public void testGetCommandHandlingAdapterInstancesForDevice(final VertxTestContext ctx) { final String deviceId = "testDevice"; final String adapterInstance = "adapterInstance"; - info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, spanContext) + info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, false, + spanContext) .compose(v -> { return info.getCommandHandlingAdapterInstances(Constants.DEFAULT_TENANT, deviceId, Collections.emptySet(), spanContext); }).setHandler(ctx.succeeding(result -> ctx.verify(() -> { @@ -307,7 +356,8 @@ public void testGetCommandHandlingAdapterInstancesWithExpiredEntry(final Vertx v final Cache mockedCache = spy(cache); info = new CacheBasedDeviceConnectionInfo(mockedCache, tracer); - info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, Duration.ofMillis(1), spanContext) + info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, Duration.ofMillis(1), + false, spanContext) .compose(v -> { final Promise instancesPromise = Promise.promise(); // wait 2ms to make sure entry has expired after that @@ -360,10 +410,12 @@ public void testGetCommandHandlingAdapterInstancesForLastKnownGateway(final Set< final Set viaGateways = new HashSet<>(Set.of(gatewayId, otherGatewayId)); viaGateways.addAll(extraUnusedViaGateways); // set command handling adapter instance for gateway - info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, null, spanContext) + info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, null, false, + spanContext) .compose(v -> { // set command handling adapter instance for other gateway - return info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, otherGatewayId, otherAdapterInstance, null, spanContext); + return info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, otherGatewayId, otherAdapterInstance, null, + false, spanContext); }).compose(deviceConnectionResult -> { return info.setLastKnownGatewayForDevice(Constants.DEFAULT_TENANT, deviceId, gatewayId, spanContext); }).compose(u -> { @@ -397,10 +449,12 @@ public void testGetCommandHandlingAdapterInstancesWithMultiResultAndLastKnownGat final Set viaGateways = new HashSet<>(Set.of(gatewayId, otherGatewayId)); viaGateways.addAll(extraUnusedViaGateways); // set command handling adapter instance for gateway - info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, null, spanContext) + info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, null, false, + spanContext) .compose(v -> { // set command handling adapter instance for other gateway - return info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, otherGatewayId, otherAdapterInstance, null, spanContext); + return info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, otherGatewayId, otherAdapterInstance, null, + false, spanContext); }).compose(deviceConnectionResult -> { return info.setLastKnownGatewayForDevice(Constants.DEFAULT_TENANT, deviceId, gatewayIdNotInVia, spanContext); }).compose(u -> { @@ -435,11 +489,13 @@ public void testGetCommandHandlingAdapterInstancesWithMultiResultAndNoAdapterFor final Set viaGateways = new HashSet<>(Set.of(gatewayId, otherGatewayId, gatewayWithNoAdapterInstance)); viaGateways.addAll(extraUnusedViaGateways); // set command handling adapter instance for gateway - info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, null, spanContext) - .compose(v -> { - // set command handling adapter instance for other gateway - return info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, otherGatewayId, otherAdapterInstance, null, spanContext); - }).compose(deviceConnectionResult -> { + info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, null, false, + spanContext) + .compose(v -> { + // set command handling adapter instance for other gateway + return info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, otherGatewayId, otherAdapterInstance, null, + false, spanContext); + }).compose(deviceConnectionResult -> { return info.setLastKnownGatewayForDevice(Constants.DEFAULT_TENANT, deviceId, gatewayWithNoAdapterInstance, spanContext); }).compose(u -> { return info.getCommandHandlingAdapterInstances(Constants.DEFAULT_TENANT, deviceId, viaGateways, spanContext); @@ -470,7 +526,8 @@ public void testGetCommandHandlingAdapterInstancesForSingleResultAndLastKnownGat final Set viaGateways = new HashSet<>(Set.of("otherGatewayId")); viaGateways.addAll(extraUnusedViaGateways); // set command handling adapter instance for gateway - info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, null, spanContext) + info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, null, false, + spanContext) .compose(deviceConnectionResult -> { return info.setLastKnownGatewayForDevice(Constants.DEFAULT_TENANT, deviceId, gatewayId, spanContext); }).compose(u -> { @@ -500,10 +557,12 @@ public void testGetCommandHandlingAdapterInstancesWithLastKnownGatewayIsGivingDe final Set viaGateways = new HashSet<>(Set.of(gatewayId)); viaGateways.addAll(extraUnusedViaGateways); // set command handling adapter instance for device - info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, spanContext) + info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, false, + spanContext) .compose(v -> { // set command handling adapter instance for other gateway - return info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, otherAdapterInstance, null, spanContext); + return info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, otherAdapterInstance, null, + false, spanContext); }).compose(u -> { return info.setLastKnownGatewayForDevice(Constants.DEFAULT_TENANT, deviceId, gatewayId, spanContext); }).compose(w -> { @@ -535,10 +594,12 @@ public void testGetCommandHandlingAdapterInstancesWithoutLastKnownGatewayIsGivin final Set viaGateways = new HashSet<>(Set.of(gatewayId)); viaGateways.addAll(extraUnusedViaGateways); // set command handling adapter instance for device - info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, spanContext) + info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, false, + spanContext) .compose(v -> { // set command handling adapter instance for other gateway - return info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, otherAdapterInstance, null, spanContext); + return info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, otherAdapterInstance, null, + false, spanContext); }).compose(w -> { return info.getCommandHandlingAdapterInstances(Constants.DEFAULT_TENANT, deviceId, viaGateways, spanContext); }).setHandler(ctx.succeeding(result -> ctx.verify(() -> { @@ -567,7 +628,8 @@ public void testGetCommandHandlingAdapterInstancesForOneSubscribedVia(final Set< final Set viaGateways = new HashSet<>(Set.of(gatewayId, otherGatewayId)); viaGateways.addAll(extraUnusedViaGateways); // set command handling adapter instance for gateway - info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, null, spanContext) + info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, null, false, + spanContext) .compose(v -> { return info.getCommandHandlingAdapterInstances(Constants.DEFAULT_TENANT, deviceId, viaGateways, spanContext); }).setHandler(ctx.succeeding(result -> ctx.verify(() -> { @@ -596,10 +658,12 @@ public void testGetCommandHandlingAdapterInstancesForMultipleSubscribedVias(fina final Set viaGateways = new HashSet<>(Set.of(gatewayId, otherGatewayId)); viaGateways.addAll(extraUnusedViaGateways); // set command handling adapter instance for gateway - info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, null, spanContext) + info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, null, false, + spanContext) .compose(v -> { // set command handling adapter instance for other gateway - return info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, otherGatewayId, otherAdapterInstance, null, spanContext); + return info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, otherGatewayId, otherAdapterInstance, null, + false, spanContext); }).compose(u -> { return info.getCommandHandlingAdapterInstances(Constants.DEFAULT_TENANT, deviceId, viaGateways, spanContext); }).setHandler(ctx.succeeding(result -> ctx.verify(() -> { @@ -650,7 +714,8 @@ public void testGetCommandHandlingAdapterInstancesFailsForOtherTenantDevice(fina final Set viaGateways = new HashSet<>(Set.of(gatewayId)); viaGateways.addAll(extraUnusedViaGateways); // set command handling adapter instance for other gateway - info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, otherGatewayId, adapterInstance, null, spanContext) + info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, otherGatewayId, adapterInstance, null, false, + spanContext) .compose(v -> { return info.getCommandHandlingAdapterInstances(Constants.DEFAULT_TENANT, deviceId, viaGateways, spanContext); }).setHandler(ctx.failing(t -> ctx.verify(() -> { diff --git a/client/src/main/java/org/eclipse/hono/client/DeviceConnectionClient.java b/client/src/main/java/org/eclipse/hono/client/DeviceConnectionClient.java index 65ac1df056..fdacc5bb76 100644 --- a/client/src/main/java/org/eclipse/hono/client/DeviceConnectionClient.java +++ b/client/src/main/java/org/eclipse/hono/client/DeviceConnectionClient.java @@ -76,6 +76,8 @@ public interface DeviceConnectionClient extends RequestResponseClient { * @param lifespan The lifespan of the mapping entry. Using a negative duration or {@code null} here is * interpreted as an unlimited lifespan. Only the number of seconds in the given duration * will be taken into account. + * @param updateOnly If {@code true}, this operation will only update an existing mapping entry with the given + * lifespan, provided such an entry with the given device id and adapter instance id exists. * @param context The currently active OpenTracing span context or {@code null} if no span is currently active. * An implementation should use this as the parent for any span it creates for tracing * the execution of this operation. @@ -83,7 +85,7 @@ public interface DeviceConnectionClient extends RequestResponseClient { * @throws NullPointerException if device id or adapter instance id is {@code null}. */ Future setCommandHandlingAdapterInstance(String deviceId, String adapterInstanceId, Duration lifespan, - SpanContext context); + boolean updateOnly, SpanContext context); /** * Removes the mapping information that associates the given device with the given protocol adapter instance diff --git a/client/src/main/java/org/eclipse/hono/client/impl/DeviceConnectionClientImpl.java b/client/src/main/java/org/eclipse/hono/client/impl/DeviceConnectionClientImpl.java index 09f8f963b1..6055942bbf 100644 --- a/client/src/main/java/org/eclipse/hono/client/impl/DeviceConnectionClientImpl.java +++ b/client/src/main/java/org/eclipse/hono/client/impl/DeviceConnectionClientImpl.java @@ -263,7 +263,7 @@ public Future removeCommandHandlingAdapterInstance(final String deviceI @Override public Future setCommandHandlingAdapterInstance(final String deviceId, final String adapterInstanceId, - final Duration lifespan, final SpanContext context) { + final Duration lifespan, final boolean updateOnly, final SpanContext context) { Objects.requireNonNull(deviceId); Objects.requireNonNull(adapterInstanceId); @@ -271,6 +271,7 @@ public Future setCommandHandlingAdapterInstance(final String deviceId, fin final Map properties = createDeviceIdProperties(deviceId); properties.put(MessageHelper.APP_PROPERTY_ADAPTER_INSTANCE_ID, adapterInstanceId); properties.put(MessageHelper.APP_PROPERTY_LIFESPAN, lifespanSeconds); + properties.put(MessageHelper.APP_PROPERTY_UPDATE_ONLY, updateOnly); final Span currentSpan = newChildSpan(context, "set command handling adapter instance"); final Promise resultTracker = Promise.promise(); diff --git a/client/src/main/java/org/eclipse/hono/client/impl/ProtocolAdapterCommandConsumerFactoryImpl.java b/client/src/main/java/org/eclipse/hono/client/impl/ProtocolAdapterCommandConsumerFactoryImpl.java index 60628e3e31..1f10bfaeff 100644 --- a/client/src/main/java/org/eclipse/hono/client/impl/ProtocolAdapterCommandConsumerFactoryImpl.java +++ b/client/src/main/java/org/eclipse/hono/client/impl/ProtocolAdapterCommandConsumerFactoryImpl.java @@ -182,9 +182,9 @@ private Future doCreateCommandConsumer(final String tenantId, f private Future setCommandHandlingAdapterInstance(final String tenantId, final String deviceId, final Duration lifespan, final SpanContext context) { return deviceConnectionClientFactory.getOrCreateDeviceConnectionClient(tenantId) - .compose(client -> { - return client.setCommandHandlingAdapterInstance(deviceId, adapterInstanceId, lifespan, context); - }).recover(thr -> { + .compose(client -> client.setCommandHandlingAdapterInstance(deviceId, adapterInstanceId, lifespan, + false, context)) + .recover(thr -> { log.info("error setting command handling adapter instance [tenant: {}, device: {}]", tenantId, deviceId, thr); // handler association failed - unregister the handler diff --git a/client/src/test/java/org/eclipse/hono/client/impl/DeviceConnectionClientImplTest.java b/client/src/test/java/org/eclipse/hono/client/impl/DeviceConnectionClientImplTest.java index 6b41535d95..aec6669cef 100644 --- a/client/src/test/java/org/eclipse/hono/client/impl/DeviceConnectionClientImplTest.java +++ b/client/src/test/java/org/eclipse/hono/client/impl/DeviceConnectionClientImplTest.java @@ -22,6 +22,7 @@ import static org.mockito.Mockito.when; import java.net.HttpURLConnection; +import java.time.Duration; import java.util.Collections; import org.apache.qpid.proton.amqp.messaging.Rejected; @@ -160,7 +161,7 @@ public void testSetLastKnownGatewayForDeviceSuccess(final VertxTestContext ctx) public void testSetCommandHandlingAdapterInstance(final VertxTestContext ctx) { // WHEN setting the command handling adapter instance - client.setCommandHandlingAdapterInstance("deviceId", "gatewayId", null, span.context()) + client.setCommandHandlingAdapterInstance("deviceId", "adapterInstanceId", null, false, span.context()) .setHandler(ctx.succeeding(r -> { ctx.verify(() -> { // THEN the response has been handled and the span is finished @@ -184,7 +185,7 @@ public void testSetCommandHandlingAdapterInstance(final VertxTestContext ctx) { public void testRemoveCommandHandlingAdapterInstance(final VertxTestContext ctx) { // WHEN removing the command handling adapter instance - client.removeCommandHandlingAdapterInstance("deviceId", "gatewayId", span.context()) + client.removeCommandHandlingAdapterInstance("deviceId", "adapterInstanceId", span.context()) .setHandler(ctx.succeeding(result -> { ctx.verify(() -> { // THEN the response has been handled and the span is finished @@ -301,7 +302,7 @@ public void testSetCommandHandlingAdapterInstanceFailsWithSendError(final VertxT when(sender.sendQueueFull()).thenReturn(true); // WHEN setting the command handling adapter instance - client.setCommandHandlingAdapterInstance("deviceId", "adapterInstanceId", null, span.context()) + client.setCommandHandlingAdapterInstance("deviceId", "adapterInstanceId", null, false, span.context()) .setHandler(ctx.failing(t -> { ctx.verify(() -> { // THEN the invocation fails and the span is marked as erroneous @@ -477,7 +478,7 @@ public void testSetCommandHandlingAdapterInstanceFailsWithRejectedRequest(final }); // WHEN setting the command handling adapter instance - client.setCommandHandlingAdapterInstance("deviceId", "adapterInstanceId", null, span.context()) + client.setCommandHandlingAdapterInstance("deviceId", "adapterInstanceId", null, false, span.context()) .setHandler(ctx.failing(t -> { assertThat(((ServiceInvocationException) t).getErrorCode()).isEqualTo(HttpURLConnection.HTTP_BAD_REQUEST); ctx.verify(() -> { @@ -584,13 +585,17 @@ public void testGetLastKnownGatewayForDeviceIncludesRequiredInformationInRequest public void testSetLastKnownGatewayForDeviceIncludesRequiredInformationInRequest() { final String deviceId = "deviceId"; + final String gatewayId = "gatewayId"; // WHEN setting last known gateway information - client.setLastKnownGatewayForDevice(deviceId, "gatewayId", span.context()); + client.setLastKnownGatewayForDevice(deviceId, gatewayId, span.context()); // THEN the message being sent contains the device ID in its properties final Message sentMessage = verifySenderSend(); assertThat(MessageHelper.getDeviceId(sentMessage)).isEqualTo(deviceId); + assertThat(MessageHelper.getApplicationProperty(sentMessage.getApplicationProperties(), + MessageHelper.APP_PROPERTY_GATEWAY_ID, String.class)) + .isEqualTo(gatewayId); assertThat(sentMessage.getMessageId()).isNotNull(); assertThat(sentMessage.getSubject()).isEqualTo(DeviceConnectionConstants.DeviceConnectionAction.SET_LAST_GATEWAY.getSubject()); assertThat(MessageHelper.getJsonPayload(sentMessage)).isNull(); @@ -606,11 +611,51 @@ public void testSetCommandHandlingAdapterInstanceIncludesRequiredInformationInRe final String deviceId = "deviceId"; // WHEN setting the command handling adapter instance - client.setCommandHandlingAdapterInstance(deviceId, "adapterInstanceId", null, span.context()); + client.setCommandHandlingAdapterInstance(deviceId, "adapterInstanceId", null, false, span.context()); // THEN the message being sent contains the device ID in its properties final Message sentMessage = verifySenderSend(); assertThat(MessageHelper.getDeviceId(sentMessage)).isEqualTo(deviceId); + assertThat(MessageHelper.getApplicationProperty(sentMessage.getApplicationProperties(), + MessageHelper.APP_PROPERTY_ADAPTER_INSTANCE_ID, String.class)) + .isEqualTo("adapterInstanceId"); + assertThat(MessageHelper.getApplicationProperty(sentMessage.getApplicationProperties(), + MessageHelper.APP_PROPERTY_LIFESPAN, Integer.class)) + .isEqualTo(Integer.valueOf(-1)); + assertThat(MessageHelper.getApplicationProperty(sentMessage.getApplicationProperties(), + MessageHelper.APP_PROPERTY_UPDATE_ONLY, Boolean.class)) + .isFalse(); + assertThat(sentMessage.getMessageId()).isNotNull(); + assertThat(sentMessage.getSubject()).isEqualTo(DeviceConnectionConstants.DeviceConnectionAction.SET_CMD_HANDLING_ADAPTER_INSTANCE.getSubject()); + assertThat(MessageHelper.getJsonPayload(sentMessage)).isNull(); + } + + /** + * Verifies that the client includes the required information in the set-cmd-handling-adapter-instance operation + * request message sent to the device connection service, including the lifespan parameter. + */ + @Test + public void testSetCommandHandlingAdapterInstanceWithLifespanIncludesRequiredInformationInRequest() { + + final String deviceId = "deviceId"; + final int lifespanSeconds = 20; + + // WHEN setting the command handling adapter instance + client.setCommandHandlingAdapterInstance(deviceId, "adapterInstanceId", + Duration.ofSeconds(lifespanSeconds), true, span.context()); + + // THEN the message being sent contains the device ID in its properties + final Message sentMessage = verifySenderSend(); + assertThat(MessageHelper.getDeviceId(sentMessage)).isEqualTo(deviceId); + assertThat(MessageHelper.getApplicationProperty(sentMessage.getApplicationProperties(), + MessageHelper.APP_PROPERTY_ADAPTER_INSTANCE_ID, String.class)) + .isEqualTo("adapterInstanceId"); + assertThat(MessageHelper.getApplicationProperty(sentMessage.getApplicationProperties(), + MessageHelper.APP_PROPERTY_LIFESPAN, Integer.class)) + .isEqualTo(lifespanSeconds); + assertThat(MessageHelper.getApplicationProperty(sentMessage.getApplicationProperties(), + MessageHelper.APP_PROPERTY_UPDATE_ONLY, Boolean.class)) + .isTrue(); assertThat(sentMessage.getMessageId()).isNotNull(); assertThat(sentMessage.getSubject()).isEqualTo(DeviceConnectionConstants.DeviceConnectionAction.SET_CMD_HANDLING_ADAPTER_INSTANCE.getSubject()); assertThat(MessageHelper.getJsonPayload(sentMessage)).isNull(); @@ -624,13 +669,17 @@ public void testSetCommandHandlingAdapterInstanceIncludesRequiredInformationInRe public void testRemoveCommandHandlingAdapterInstanceIncludesRequiredInformationInRequest() { final String deviceId = "deviceId"; + final String adapterInstanceId = "adapterInstanceId"; // WHEN removing the command handling adapter instance - client.removeCommandHandlingAdapterInstance(deviceId, "adapterInstanceId", span.context()); + client.removeCommandHandlingAdapterInstance(deviceId, adapterInstanceId, span.context()); // THEN the message being sent contains the device ID in its properties final Message sentMessage = verifySenderSend(); assertThat(MessageHelper.getDeviceId(sentMessage)).isEqualTo(deviceId); + assertThat(MessageHelper.getApplicationProperty(sentMessage.getApplicationProperties(), + MessageHelper.APP_PROPERTY_ADAPTER_INSTANCE_ID, String.class)) + .isEqualTo(adapterInstanceId); assertThat(sentMessage.getMessageId()).isNotNull(); assertThat(sentMessage.getSubject()).isEqualTo(DeviceConnectionConstants.DeviceConnectionAction.REMOVE_CMD_HANDLING_ADAPTER_INSTANCE.getSubject()); assertThat(MessageHelper.getJsonPayload(sentMessage)).isNull(); diff --git a/client/src/test/java/org/eclipse/hono/client/impl/ProtocolAdapterCommandConsumerFactoryImplTest.java b/client/src/test/java/org/eclipse/hono/client/impl/ProtocolAdapterCommandConsumerFactoryImplTest.java index 9b93615fee..176475d457 100644 --- a/client/src/test/java/org/eclipse/hono/client/impl/ProtocolAdapterCommandConsumerFactoryImplTest.java +++ b/client/src/test/java/org/eclipse/hono/client/impl/ProtocolAdapterCommandConsumerFactoryImplTest.java @@ -135,7 +135,7 @@ public void setUp() { when(deviceConnectionClientFactory.connect()).thenReturn(Future.succeededFuture(mock(HonoConnection.class))); when(deviceConnectionClientFactory.getOrCreateDeviceConnectionClient(anyString())) .thenReturn(Future.succeededFuture(devConClient)); - when(devConClient.setCommandHandlingAdapterInstance(anyString(), anyString(), any(), any())) + when(devConClient.setCommandHandlingAdapterInstance(anyString(), anyString(), any(), anyBoolean(), any())) .thenReturn(Future.succeededFuture()); when(devConClient.removeCommandHandlingAdapterInstance(anyString(), anyString(), any())) .thenReturn(Future.succeededFuture(Boolean.TRUE)); @@ -187,7 +187,7 @@ public void testCreateCommandConsumerSucceeds(final VertxTestContext ctx) { ctx.verify(() -> { verify(connection).createReceiver(eq(tenantCommandAddress), eq(ProtonQoS.AT_LEAST_ONCE), any(), anyInt(), eq(false), any()); - verify(devConClient).setCommandHandlingAdapterInstance(eq(deviceId), anyString(), any(), any()); + verify(devConClient).setCommandHandlingAdapterInstance(eq(deviceId), anyString(), any(), eq(false), any()); }); ctx.completeNow(); })); @@ -211,7 +211,7 @@ public void testCreateTimeLimitedCommandConsumerSucceeds(final VertxTestContext ctx.verify(() -> { verify(connection).createReceiver(eq(tenantCommandAddress), eq(ProtonQoS.AT_LEAST_ONCE), any(), anyInt(), eq(false), any()); - verify(devConClient).setCommandHandlingAdapterInstance(eq(deviceId), anyString(), any(), any()); + verify(devConClient).setCommandHandlingAdapterInstance(eq(deviceId), anyString(), any(), eq(false), any()); // verify closing the consumer is successful consumer.close(ctx.succeeding(v -> { ctx.verify(() -> { diff --git a/core/src/main/java/org/eclipse/hono/util/MessageHelper.java b/core/src/main/java/org/eclipse/hono/util/MessageHelper.java index d4ed941497..ab718dbc0a 100644 --- a/core/src/main/java/org/eclipse/hono/util/MessageHelper.java +++ b/core/src/main/java/org/eclipse/hono/util/MessageHelper.java @@ -80,6 +80,11 @@ public final class MessageHelper { * The name of the AMQP 1.0 message application property containing a lifespan value in seconds. */ public static final String APP_PROPERTY_LIFESPAN = "lifespan"; + /** + * The name of the AMQP 1.0 message application property defining whether the operation should only + * update an existing value or not. + */ + public static final String APP_PROPERTY_UPDATE_ONLY = "update_only"; /** * The name of the AMQP 1.0 application property that is used to convey the address that a message has been * originally published to by a device. diff --git a/service-base/src/main/java/org/eclipse/hono/service/deviceconnection/DelegatingDeviceConnectionAmqpEndpoint.java b/service-base/src/main/java/org/eclipse/hono/service/deviceconnection/DelegatingDeviceConnectionAmqpEndpoint.java index 487a451026..99d73c3998 100644 --- a/service-base/src/main/java/org/eclipse/hono/service/deviceconnection/DelegatingDeviceConnectionAmqpEndpoint.java +++ b/service-base/src/main/java/org/eclipse/hono/service/deviceconnection/DelegatingDeviceConnectionAmqpEndpoint.java @@ -16,6 +16,7 @@ import java.time.Duration; import java.util.List; import java.util.Objects; +import java.util.Optional; import org.apache.qpid.proton.message.Message; import org.eclipse.hono.client.ClientErrorException; @@ -241,6 +242,9 @@ protected Future processSetCmdHandlingAdapterInstance(final Message req final String deviceId = MessageHelper.getDeviceId(request); final String adapterInstanceId = MessageHelper.getApplicationProperty(request.getApplicationProperties(), MessageHelper.APP_PROPERTY_ADAPTER_INSTANCE_ID, String.class); final Integer lifespanSecondsOrNull = MessageHelper.getApplicationProperty(request.getApplicationProperties(), MessageHelper.APP_PROPERTY_LIFESPAN, Integer.class); + final boolean updateOnly = Optional + .ofNullable(MessageHelper.getApplicationProperty(request.getApplicationProperties(), MessageHelper.APP_PROPERTY_UPDATE_ONLY, Boolean.class)) + .orElse(false); final Span span = TracingHelper.buildServerChildSpan( tracer, @@ -259,10 +263,12 @@ protected Future processSetCmdHandlingAdapterInstance(final Message req TracingHelper.TAG_DEVICE_ID.set(span, deviceId); span.setTag(MessageHelper.APP_PROPERTY_ADAPTER_INSTANCE_ID, adapterInstanceId); span.setTag(MessageHelper.APP_PROPERTY_LIFESPAN, lifespan.getSeconds()); - log.debug("setting command handling adapter instance for tenant [{}], device [{}] to {} (lifespan: {}s)", - tenantId, deviceId, adapterInstanceId, lifespan.getSeconds()); + span.setTag(MessageHelper.APP_PROPERTY_UPDATE_ONLY, updateOnly); + log.debug("setting command handling adapter instance for tenant [{}], device [{}] to {} (lifespan: {}s, updateOnly: {})", + tenantId, deviceId, adapterInstanceId, lifespan.getSeconds(), updateOnly); - resultFuture = getService().setCommandHandlingAdapterInstance(tenantId, deviceId, adapterInstanceId, lifespan, span) + resultFuture = getService().setCommandHandlingAdapterInstance(tenantId, deviceId, adapterInstanceId, lifespan, + updateOnly, span) .map(res -> DeviceConnectionConstants.getAmqpReply( DeviceConnectionConstants.DEVICE_CONNECTION_ENDPOINT, tenantId, diff --git a/service-base/src/main/java/org/eclipse/hono/service/deviceconnection/DeviceConnectionService.java b/service-base/src/main/java/org/eclipse/hono/service/deviceconnection/DeviceConnectionService.java index ce9e24b557..5a5faaab20 100644 --- a/service-base/src/main/java/org/eclipse/hono/service/deviceconnection/DeviceConnectionService.java +++ b/service-base/src/main/java/org/eclipse/hono/service/deviceconnection/DeviceConnectionService.java @@ -83,6 +83,8 @@ Future setLastKnownGatewayForDevice(String tenantId, Str * @param lifespan The lifespan of the mapping entry. Using a negative duration or {@code null} here is * interpreted as an unlimited lifespan. The guaranteed granularity taken into account * here is seconds. + * @param updateOnly If {@code true}, this operation will only update an existing mapping entry with the given + * lifespan, provided such an entry with the given device id and adapter instance id exists. * @param span The active OpenTracing span for this operation. It is not to be closed in this method! An * implementation should log (error) events on this span and it may set tags and use this span as the * parent for any spans created in this method. @@ -91,7 +93,7 @@ Future setLastKnownGatewayForDevice(String tenantId, Str * @throws NullPointerException if any of the parameters is {@code null}. */ Future setCommandHandlingAdapterInstance(String tenantId, String deviceId, - String adapterInstanceId, Duration lifespan, Span span); + String adapterInstanceId, Duration lifespan, boolean updateOnly, Span span); /** * Removes the mapping information that associates the given device with the given protocol adapter instance diff --git a/services/device-connection/src/main/java/org/eclipse/hono/deviceconnection/infinispan/CacheBasedDeviceConnectionService.java b/services/device-connection/src/main/java/org/eclipse/hono/deviceconnection/infinispan/CacheBasedDeviceConnectionService.java index 3d3dd0e5d3..38f081a166 100644 --- a/services/device-connection/src/main/java/org/eclipse/hono/deviceconnection/infinispan/CacheBasedDeviceConnectionService.java +++ b/services/device-connection/src/main/java/org/eclipse/hono/deviceconnection/infinispan/CacheBasedDeviceConnectionService.java @@ -80,8 +80,8 @@ public Future getLastKnownGatewayForDevice( @Override public Future setCommandHandlingAdapterInstance(final String tenantId, final String deviceId, final String adapterInstanceId, final Duration lifespan, - final Span span) { - return cache.setCommandHandlingAdapterInstance(tenantId, deviceId, adapterInstanceId, lifespan, span.context()) + final boolean updateOnly, final Span span) { + return cache.setCommandHandlingAdapterInstance(tenantId, deviceId, adapterInstanceId, lifespan, updateOnly, span.context()) .map(v -> DeviceConnectionResult.from(HttpURLConnection.HTTP_NO_CONTENT)) .otherwise(t -> DeviceConnectionResult.from(ServiceInvocationException.extractStatusCode(t))); } diff --git a/services/device-connection/src/test/java/org/eclipse/hono/deviceconnection/infinispan/RemoteCacheBasedDeviceConnectionServiceTest.java b/services/device-connection/src/test/java/org/eclipse/hono/deviceconnection/infinispan/RemoteCacheBasedDeviceConnectionServiceTest.java index 0ca7f0c3ed..b93956de7b 100644 --- a/services/device-connection/src/test/java/org/eclipse/hono/deviceconnection/infinispan/RemoteCacheBasedDeviceConnectionServiceTest.java +++ b/services/device-connection/src/test/java/org/eclipse/hono/deviceconnection/infinispan/RemoteCacheBasedDeviceConnectionServiceTest.java @@ -15,6 +15,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; @@ -155,17 +156,20 @@ public void testSetCommandHandlingAdapterInstance(final VertxTestContext ctx) { final String deviceId = "testDevice"; final String adapterInstanceId = "adapterInstanceId"; - when(cache.setCommandHandlingAdapterInstance(anyString(), anyString(), anyString(), any(), any(SpanContext.class))) + final boolean updateOnly = true; + when(cache.setCommandHandlingAdapterInstance(anyString(), anyString(), anyString(), any(), anyBoolean(), any(SpanContext.class))) .thenReturn(Future.succeededFuture()); givenAStartedService() - .compose(ok -> svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstanceId, null, span)) - .setHandler(ctx.succeeding(result -> { - ctx.verify(() -> { - assertThat(result.getStatus()).isEqualTo(HttpURLConnection.HTTP_NO_CONTENT); - verify(cache).setCommandHandlingAdapterInstance(eq(Constants.DEFAULT_TENANT), eq(deviceId), eq(adapterInstanceId), any(), any(SpanContext.class)); - }); - ctx.completeNow(); - })); + .compose(ok -> svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstanceId, null, + updateOnly, span)) + .setHandler(ctx.succeeding(result -> { + ctx.verify(() -> { + assertThat(result.getStatus()).isEqualTo(HttpURLConnection.HTTP_NO_CONTENT); + verify(cache).setCommandHandlingAdapterInstance(eq(Constants.DEFAULT_TENANT), eq(deviceId), eq(adapterInstanceId), any(), + eq(updateOnly), any(SpanContext.class)); + }); + ctx.completeNow(); + })); } } diff --git a/services/device-registry-base/src/main/java/org/eclipse/hono/deviceregistry/service/deviceconnection/MapBasedDeviceConnectionService.java b/services/device-registry-base/src/main/java/org/eclipse/hono/deviceregistry/service/deviceconnection/MapBasedDeviceConnectionService.java index ecc2218a76..57aa85e909 100644 --- a/services/device-registry-base/src/main/java/org/eclipse/hono/deviceregistry/service/deviceconnection/MapBasedDeviceConnectionService.java +++ b/services/device-registry-base/src/main/java/org/eclipse/hono/deviceregistry/service/deviceconnection/MapBasedDeviceConnectionService.java @@ -25,6 +25,7 @@ import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.function.Predicate; import org.eclipse.hono.service.deviceconnection.DeviceConnectionService; import org.eclipse.hono.util.DeviceConnectionConstants; @@ -117,7 +118,8 @@ public final Future getLastKnownGatewayForDevice(final S @Override public final Future setCommandHandlingAdapterInstance(final String tenantId, - final String deviceId, final String protocolAdapterInstanceId, final Duration lifespan, final Span span) { + final String deviceId, final String protocolAdapterInstanceId, final Duration lifespan, + final boolean updateOnly, final Span span) { Objects.requireNonNull(tenantId); Objects.requireNonNull(deviceId); Objects.requireNonNull(protocolAdapterInstanceId); @@ -128,9 +130,19 @@ public final Future setCommandHandlingAdapterInstance(fi final int currentMapSize = adapterInstancesForTenantMap.size(); if (currentMapSize < getConfig().getMaxDevicesPerTenant() || (currentMapSize == getConfig().getMaxDevicesPerTenant() && adapterInstancesForTenantMap.containsKey(deviceId))) { - adapterInstancesForTenantMap.put(deviceId, - new ExpiringValue<>(createAdapterInstanceIdJson(protocolAdapterInstanceId), getLifespanNanos(lifespan))); - result = DeviceConnectionResult.from(HttpURLConnection.HTTP_NO_CONTENT); + final ExpiringValue newValue = new ExpiringValue<>( + createAdapterInstanceIdJson(protocolAdapterInstanceId), getLifespanNanos(lifespan)); + if (updateOnly) { + if (replaceIfMatching(adapterInstancesForTenantMap, deviceId, newValue, + v -> protocolAdapterInstanceId.equals(getAdapterInstanceIdFromJson(v.getValue())))) { + result = DeviceConnectionResult.from(HttpURLConnection.HTTP_NO_CONTENT); + } else { + result = DeviceConnectionResult.from(HttpURLConnection.HTTP_PRECON_FAILED); + } + } else { + adapterInstancesForTenantMap.put(deviceId, newValue); + result = DeviceConnectionResult.from(HttpURLConnection.HTTP_NO_CONTENT); + } } else { log.debug("cannot set protocol adapter instance for handling commands of device [{}], tenant [{}]: max number of entries per tenant reached ({})", deviceId, tenantId, getConfig().getMaxDevicesPerTenant()); @@ -159,13 +171,13 @@ public long expireAfterCreate(final String key, final ExpiringValue @Override public long expireAfterUpdate(final String key, final ExpiringValue value, final long currentTime, final long currentDuration) { - return Long.MAX_VALUE; + return value.getLifespanNanos(); } @Override public long expireAfterRead(final String key, final ExpiringValue value, final long currentTime, final long currentDuration) { - return Long.MAX_VALUE; + return currentDuration; } }) .build().asMap(); @@ -300,6 +312,22 @@ private static V getValue(final Map> map, final K key return expiringValue != null ? expiringValue.getValue() : null; } + /** + * Replaces the entry for a key only if evaluating the given predicate on the current value returns {@code true}. + */ + private static boolean replaceIfMatching(final ConcurrentMap map, final K key, final V newValue, + final Predicate matchingPredicate) { + for (V oldValue; (oldValue = map.get(key)) != null;) { + if (!matchingPredicate.test(oldValue)) { + return false; + } + if (map.replace(key, oldValue, newValue)) { + return true; + } + } + return false; + } + /** * Keeps a value along with a lifespan. * @@ -316,11 +344,11 @@ static class ExpiringValue { * @param value The value. * @param lifespanNanos The lifespan in nanoseconds. To indicate no expiration an excessively * long period may be given, such as {@code Long#MAX_VALUE}. - * @throws NullPointerException if any of the parameters is {@code null}. + * @throws NullPointerException if value is {@code null}. */ ExpiringValue(final T value, final long lifespanNanos) { this.value = Objects.requireNonNull(value); - this.lifespanNanos = Objects.requireNonNull(lifespanNanos); + this.lifespanNanos = lifespanNanos; } /** diff --git a/services/device-registry-base/src/test/java/org/eclipse/hono/deviceregistry/service/deviceconnection/MapBasedDeviceConnectionServiceTest.java b/services/device-registry-base/src/test/java/org/eclipse/hono/deviceregistry/service/deviceconnection/MapBasedDeviceConnectionServiceTest.java index b2decbeeec..4fe61c4869 100644 --- a/services/device-registry-base/src/test/java/org/eclipse/hono/deviceregistry/service/deviceconnection/MapBasedDeviceConnectionServiceTest.java +++ b/services/device-registry-base/src/test/java/org/eclipse/hono/deviceregistry/service/deviceconnection/MapBasedDeviceConnectionServiceTest.java @@ -169,7 +169,7 @@ public void testSetLastKnownGatewayForDeviceFailsIfLimitReached(final VertxTestC public void testSetCommandHandlingAdapterInstanceSucceeds(final VertxTestContext ctx) { final String deviceId = "testDevice"; final String adapterInstance = "adapterInstance"; - svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, span) + svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, false, span) .setHandler(ctx.succeeding(result -> ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, result.getStatus()); ctx.completeNow(); @@ -187,14 +187,14 @@ public void testSetCommandHandlingAdapterInstanceFailsIfLimitReached(final Vertx props.setMaxDevicesPerTenant(1); final String deviceId = "testDevice"; final String adapterInstance = "adapterInstance"; - svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, span) + svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, false, span) .compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); }); // set another entry return svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, "testDevice2", - adapterInstance, null, span); + adapterInstance, null, false, span); }).setHandler(ctx.succeeding(deviceConnectionResult -> ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_FORBIDDEN, deviceConnectionResult.getStatus()); assertNull(deviceConnectionResult.getPayload()); @@ -202,6 +202,59 @@ public void testSetCommandHandlingAdapterInstanceFailsIfLimitReached(final Vertx }))); } + /** + * Verifies that the setCommandHandlingAdapterInstance operation with updateOnly set to + * {@code true} succeeds. + * + * @param ctx The vert.x context. + */ + @Test + public void testSetCommandHandlingAdapterInstanceWithUpdateOnlyTrueSucceeds(final VertxTestContext ctx) { + final String deviceId = "testDevice"; + final String adapterInstance = "adapterInstance"; + // first invocation initially adds the entry + svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, false, span) + .compose(deviceConnectionResult -> { + ctx.verify(() -> { + assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); + }); + // now update the entry + return svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, + null, true, span); + }).setHandler(ctx.succeeding(result -> ctx.verify(() -> { + assertEquals(HttpURLConnection.HTTP_NO_CONTENT, result.getStatus()); + assertNull(result.getPayload()); + ctx.completeNow(); + }))); + } + + /** + * Verifies that the setCommandHandlingAdapterInstance operation with updateOnly set to + * {@code true} fails with a PRECON_FAILED status if the given adapter instance parameter doesn't match the one of + * the entry registered for the given device. + * + * @param ctx The vert.x context. + */ + @Test + public void testSetCommandHandlingAdapterInstanceWithUpdateOnlyTrueFails(final VertxTestContext ctx) { + final String deviceId = "testDevice"; + final String adapterInstance = "adapterInstance"; + // first invocation initially adds the entry + svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, false, span) + .compose(deviceConnectionResult -> { + ctx.verify(() -> { + assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); + }); + // now try to update the entry, but with another adapter instance + return svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, + "otherAdapterInstance", null, true, span); + }).setHandler(ctx.succeeding(result -> ctx.verify(() -> { + assertEquals(HttpURLConnection.HTTP_PRECON_FAILED, result.getStatus()); + assertNull(result.getPayload()); + ctx.completeNow(); + }))); + } + /** * Verifies that the removeCommandHandlingAdapterInstance operation succeeds if there was an entry to be * deleted. @@ -212,7 +265,7 @@ public void testSetCommandHandlingAdapterInstanceFailsIfLimitReached(final Vertx public void testRemoveCommandHandlingAdapterInstanceSucceeds(final VertxTestContext ctx) { final String deviceId = "testDevice"; final String adapterInstance = "adapterInstance"; - svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, span) + svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, false, span) .compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); @@ -236,7 +289,7 @@ public void testRemoveCommandHandlingAdapterInstanceSucceeds(final VertxTestCont public void testRemoveCommandHandlingAdapterInstanceFailsForOtherDevice(final VertxTestContext ctx) { final String deviceId = "testDevice"; final String adapterInstance = "adapterInstance"; - svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, span) + svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, false, span) .compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); @@ -260,7 +313,7 @@ public void testRemoveCommandHandlingAdapterInstanceFailsForOtherDevice(final Ve public void testRemoveCommandHandlingAdapterInstanceFailsForOtherAdapterInstance(final VertxTestContext ctx) { final String deviceId = "testDevice"; final String adapterInstance = "adapterInstance"; - svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, span) + svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, false, span) .compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); @@ -287,7 +340,7 @@ public void testRemoveCommandHandlingAdapterInstanceFailsForExpiredEntry(final V final String deviceId = "testDevice"; final String adapterInstance = "adapterInstance"; final Duration lifespan = Duration.ofMillis(1); - svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, lifespan, span) + svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, lifespan, false, span) .compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); @@ -300,10 +353,10 @@ public void testRemoveCommandHandlingAdapterInstanceFailsForExpiredEntry(final V }); return instancesPromise.future(); }).setHandler(ctx.succeeding(result -> ctx.verify(() -> { - assertEquals(HttpURLConnection.HTTP_NOT_FOUND, result.getStatus()); - assertNull(result.getPayload()); - ctx.completeNow(); - }))); + assertEquals(HttpURLConnection.HTTP_NOT_FOUND, result.getStatus()); + assertNull(result.getPayload()); + ctx.completeNow(); + }))); } /** @@ -316,7 +369,7 @@ public void testRemoveCommandHandlingAdapterInstanceFailsForExpiredEntry(final V public void testGetCommandHandlingAdapterInstancesForDevice(final VertxTestContext ctx) { final String deviceId = "testDevice"; final String adapterInstance = "adapterInstance"; - svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, span) + svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, false, span) .compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); @@ -344,7 +397,7 @@ public void testGetCommandHandlingAdapterInstancesForExpiredEntry(final Vertx ve final String deviceId = "testDevice"; final String adapterInstance = "adapterInstance"; final Duration lifespan = Duration.ofMillis(1); - svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, lifespan, span) + svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, lifespan, false, span) .compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); @@ -363,6 +416,46 @@ public void testGetCommandHandlingAdapterInstancesForExpiredEntry(final Vertx ve }))); } + /** + * Verifies that the getCommandHandlingAdapterInstances operation fails if the adapter instance + * entry has expired after having been updated with a short lifespan. + * + * @param vertx The vert.x instance. + * @param ctx The vert.x context. + */ + @Test + public void testGetCommandHandlingAdapterInstancesForUpdatedExpiredEntry(final Vertx vertx, final VertxTestContext ctx) { + final String deviceId = "testDevice"; + final String adapterInstance = "adapterInstance"; + final Duration firstLongLifespan = Duration.ofSeconds(300); + final Duration secondShortLifespan = Duration.ofMillis(1); + // create entry with a long lifespan first + svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, firstLongLifespan, false, span) + .compose(deviceConnectionResult -> { + ctx.verify(() -> { + assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); + }); + // now invoke with "updateOnly" and use the short lifespan + return svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, + secondShortLifespan, true, span); + }).compose(deviceConnectionResult -> { + ctx.verify(() -> { + assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); + }); + final Promise instancesPromise = Promise.promise(); + // wait 2ms so that the lifespan has elapsed + vertx.setTimer(2, tid -> { + svc.getCommandHandlingAdapterInstances(Constants.DEFAULT_TENANT, deviceId, + Collections.emptyList(), span) + .setHandler(instancesPromise.future()); + }); + return instancesPromise.future(); + }).setHandler(ctx.succeeding(result -> ctx.verify(() -> { + assertEquals(HttpURLConnection.HTTP_NOT_FOUND, result.getStatus()); + ctx.completeNow(); + }))); + } + /** * Verifies that the getCommandHandlingAdapterInstances operation fails for a device with no viaGateways, * if no matching instance has been registered. @@ -395,14 +488,14 @@ public void testGetCommandHandlingAdapterInstancesForLastKnownGateway(final Vert final String otherGatewayId = "gw-2"; final List viaGateways = List.of(gatewayId, otherGatewayId); // set command handling adapter instance for gateway - svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, null, span) + svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, null, false, span) .compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); }); // set command handling adapter instance for other gateway return svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, otherGatewayId, - otherAdapterInstance, null, span); + otherAdapterInstance, null, false, span); }).compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); @@ -443,14 +536,14 @@ public void testGetCommandHandlingAdapterInstancesWithMultiResultAndLastKnownGat final String gatewayIdNotInVia = "gw-old"; final List viaGateways = List.of(gatewayId, otherGatewayId); // set command handling adapter instance for gateway - svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, null, span) + svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, null, false, span) .compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); }); // set command handling adapter instance for other gateway return svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, otherGatewayId, - otherAdapterInstance, null, span); + otherAdapterInstance, null, false, span); }).compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); @@ -492,14 +585,14 @@ public void testGetCommandHandlingAdapterInstancesWithMultiResultAndNoAdapterFor final String gatewayWithNoAdapterInstance = "gw-other"; final List viaGateways = List.of(gatewayId, otherGatewayId, gatewayWithNoAdapterInstance); // set command handling adapter instance for gateway - svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, null, span) + svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, null, false, span) .compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); }); // set command handling adapter instance for other gateway return svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, otherGatewayId, - otherAdapterInstance, null, span); + otherAdapterInstance, null, false, span); }).compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); @@ -537,7 +630,7 @@ public void testGetCommandHandlingAdapterInstancesForLastKnownGatewayNotInVia(fi final String gatewayId = "gw-1"; final List viaGateways = Collections.singletonList("otherGatewayId"); // set command handling adapter instance for gateway - svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, null, span) + svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, null, false, span) .compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); @@ -571,14 +664,14 @@ public void testGetCommandHandlingAdapterInstancesWithLastKnownGatewayIsGivingDe final String gatewayId = "gw-1"; final List viaGateways = List.of(gatewayId); // set command handling adapter instance for device - svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, span) + svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, false, span) .compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); }); // set command handling adapter instance for other gateway return svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, - otherAdapterInstance, null, span); + otherAdapterInstance, null, false, span); }).compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); @@ -616,14 +709,14 @@ public void testGetCommandHandlingAdapterInstancesWithoutLastKnownGatewayIsGivin final String gatewayId = "gw-1"; final List viaGateways = List.of(gatewayId); // set command handling adapter instance for device - svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, span) + svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, false, span) .compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); }); // set command handling adapter instance for other gateway return svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, - otherAdapterInstance, null, span); + otherAdapterInstance, null, false, span); }).compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); @@ -654,7 +747,7 @@ public void testGetCommandHandlingAdapterInstancesForOneSubscribedVia(final Vert final String otherGatewayId = "gw-2"; final List viaGateways = List.of(gatewayId, otherGatewayId); // set command handling adapter instance for gateway - svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, null, span) + svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, null, false, span) .compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); @@ -685,14 +778,14 @@ public void testGetCommandHandlingAdapterInstancesForMultipleSubscribedVias(fina final String otherGatewayId = "gw-2"; final List viaGateways = List.of(gatewayId, otherGatewayId); // set command handling adapter instance for gateway - svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, null, span) + svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, gatewayId, adapterInstance, null, false, span) .compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); }); // set command handling adapter instance for other gateway return svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, otherGatewayId, - otherAdapterInstance, null, span); + otherAdapterInstance, null, false, span); }).compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); @@ -741,7 +834,8 @@ public void testGetCommandHandlingAdapterInstancesFailsForOtherTenantDevice(fina final String otherGatewayId = "gw-2"; final List viaGateways = List.of(gatewayId); // set command handling adapter instance for other gateway - svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, otherGatewayId, adapterInstance, null, span) + svc.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, otherGatewayId, adapterInstance, null, false, + span) .compose(deviceConnectionResult -> { ctx.verify(() -> { assertEquals(HttpURLConnection.HTTP_NO_CONTENT, deviceConnectionResult.getStatus()); diff --git a/site/documentation/content/api/device-connection/index.md b/site/documentation/content/api/device-connection/index.md index 1aa27fcb2f..bd5f0f674f 100644 --- a/site/documentation/content/api/device-connection/index.md +++ b/site/documentation/content/api/device-connection/index.md @@ -129,6 +129,7 @@ The following table provides an overview of the properties a client needs to set | *subject* | yes | *properties* | *string* | MUST be set to `set-cmd-handling-adapter-instance`. | | *adapter_instance_id* | yes | *application-properties* | *string* | The identifier of the protocol adapter instance that currently handles commands for the device or gateway identified by the *device_id* property. | | *lifespan* | no | *application-properties* | *int* | The lifespan of the mapping entry in seconds. After that period, the mapping entry shall be treated as non-existent by the *Device Registration API* methods. A negative value, as well as an omitted property, is interpreted as an unlimited lifespan. | +| *update_only* | no | *application-properties* | *boolean* | If set to `true`, the command will only update an existing mapping entry with the given lifespan, provided such an entry with the given device id and adapter instance id exists. If the property is omitted or set to `false`, the mapping entry will be set regardless whether an entry with the given adapter instance id exists or not. | The body of the message SHOULD be empty and will be ignored if it is not. @@ -140,8 +141,9 @@ The response message's *status* property may contain the following codes: | Code | Description | | :---- | :---------- | -| *204* | OK, the adapter instance for the device has been updated. | -| *400* | Bad Request, the command-handling adapter instance has not been updated due to invalid or missing data in the request. | +| *204* | OK, the command-handling adapter instance for the device has been updated. | +| *400* | Bad Request, the adapter instance for the device has not been set or updated due to invalid or missing data in the request. | +| *412* | Precondition failed, the adapter instance for the device has not been set or updated. This status is returned if the *update_only* property is set to `true` but no matching entry for the given device ID and adapter instance value exists. | Implementors of this API may return a *404* status code in order to indicate that no device with the given identifier exists for the given tenant. However, performing such a check is optional. diff --git a/tests/src/test/java/org/eclipse/hono/tests/jms/JmsBasedDeviceConnectionClient.java b/tests/src/test/java/org/eclipse/hono/tests/jms/JmsBasedDeviceConnectionClient.java index 09d42c8e95..f97b5d0d52 100644 --- a/tests/src/test/java/org/eclipse/hono/tests/jms/JmsBasedDeviceConnectionClient.java +++ b/tests/src/test/java/org/eclipse/hono/tests/jms/JmsBasedDeviceConnectionClient.java @@ -115,6 +115,7 @@ public Future setCommandHandlingAdapterInstance( final String deviceId, final String adapterInstanceId, final Duration lifespan, + final boolean updateOnly, final SpanContext context) { final int lifespanSeconds = lifespan != null && lifespan.getSeconds() <= Integer.MAX_VALUE ? (int) lifespan.getSeconds() : -1; @@ -122,7 +123,8 @@ public Future setCommandHandlingAdapterInstance( DeviceConnectionAction.SET_CMD_HANDLING_ADAPTER_INSTANCE.getSubject(), Map.of(MessageHelper.APP_PROPERTY_DEVICE_ID, deviceId, MessageHelper.APP_PROPERTY_ADAPTER_INSTANCE_ID, adapterInstanceId, - MessageHelper.APP_PROPERTY_LIFESPAN, lifespanSeconds), + MessageHelper.APP_PROPERTY_LIFESPAN, lifespanSeconds, + MessageHelper.APP_PROPERTY_UPDATE_ONLY, updateOnly), null) .onSuccess(payload -> LOGGER.debug("successfully set command-handling adapter instance")) .onFailure(t -> LOGGER.error("failed to set command-handling adapter instance", t)) diff --git a/tests/src/test/java/org/eclipse/hono/tests/registry/DeviceConnectionApiTests.java b/tests/src/test/java/org/eclipse/hono/tests/registry/DeviceConnectionApiTests.java index b69902441c..38926630a8 100644 --- a/tests/src/test/java/org/eclipse/hono/tests/registry/DeviceConnectionApiTests.java +++ b/tests/src/test/java/org/eclipse/hono/tests/registry/DeviceConnectionApiTests.java @@ -118,7 +118,8 @@ public void testSetCommandHandlingAdapterInstanceSucceeds(final VertxTestContext final String adapterInstance = randomId(); getClient(Constants.DEFAULT_TENANT) - .compose(client -> client.setCommandHandlingAdapterInstance(deviceId, adapterInstance, null, null).map(client)) + .compose(client -> client + .setCommandHandlingAdapterInstance(deviceId, adapterInstance, null, false, null).map(client)) .compose(client -> client.getCommandHandlingAdapterInstances(deviceId, List.of(), null)) .setHandler(ctx.succeeding(r -> { ctx.verify(() -> { @@ -148,9 +149,9 @@ public void testGetCommandHandlingAdapterInstancesFailsForExpiredEntry(final Ver final Duration lifespan = Duration.ofSeconds(1); getClient(Constants.DEFAULT_TENANT) - .compose(client -> { - return client.setCommandHandlingAdapterInstance(deviceId, adapterInstance, lifespan, null).map(client); - }) + .compose(client -> client + .setCommandHandlingAdapterInstance(deviceId, adapterInstance, lifespan, false, null) + .map(client)) .compose(client -> { final Promise instancesPromise = Promise.promise(); // wait 1s to make sure that entry has expired after that @@ -166,6 +167,66 @@ public void testGetCommandHandlingAdapterInstancesFailsForExpiredEntry(final Ver })); } + /** + * Verifies that a request to update the command-handling adapter instance for a device succeeds. + * + * @param ctx The vert.x test context. + */ + @Timeout(value = 5, timeUnit = TimeUnit.SECONDS) + @Test + public void testSetCommandHandlingAdapterInstanceWithUpdateOnlyTrueSucceeds(final VertxTestContext ctx) { + + final String deviceId = randomId(); + final String adapterInstance = randomId(); + + getClient(Constants.DEFAULT_TENANT) + // first invocation initially adds the entry + .compose(client -> client + .setCommandHandlingAdapterInstance(deviceId, adapterInstance, null, false, null).map(client)) + // now update the entry + .compose(client -> client + .setCommandHandlingAdapterInstance(deviceId, adapterInstance, null, true, null).map(client)) + .compose(client -> client.getCommandHandlingAdapterInstances(deviceId, List.of(), null)) + .setHandler(ctx.succeeding(r -> { + ctx.verify(() -> { + final JsonArray instanceList = r.getJsonArray(DeviceConnectionConstants.FIELD_ADAPTER_INSTANCES); + assertThat(instanceList).hasSize(1); + final JsonObject instance = instanceList.getJsonObject(0); + assertThat(instance.getString(DeviceConnectionConstants.FIELD_PAYLOAD_DEVICE_ID)).isEqualTo(deviceId); + assertThat(instance.getString(DeviceConnectionConstants.FIELD_ADAPTER_INSTANCE_ID)).isEqualTo(adapterInstance); + }); + ctx.completeNow(); + })); + } + + /** + * Verifies that a request to update the command-handling adapter instance for a device fails if + * the given adapter instance parameter doesn't match the one of the entry registered for the given device. + * + * @param ctx The vert.x test context. + */ + @Timeout(value = 6, timeUnit = TimeUnit.SECONDS) + @Test + public void testSetCommandHandlingAdapterInstanceWithUpdateOnlyTrueFails(final VertxTestContext ctx) { + + final String deviceId = randomId(); + final String adapterInstance = randomId(); + final Duration lifespan = Duration.ofSeconds(1); + + getClient(Constants.DEFAULT_TENANT) + // first invocation initially adds the entry + .compose(client -> client + .setCommandHandlingAdapterInstance(deviceId, adapterInstance, lifespan, false, null) + .map(client)) + // now try to update the entry, but with another adapter instance + .compose(client -> client + .setCommandHandlingAdapterInstance(deviceId, "otherAdapterInstance", lifespan, true, null)) + .setHandler(ctx.failing(t -> { + ctx.verify(() -> assertErrorCode(t, HttpURLConnection.HTTP_PRECON_FAILED)); + ctx.completeNow(); + })); + } + /** * Verifies that a request to get the command-handling adapter instance for a device fails if no * adapter is registered for the device.