diff --git a/client/src/main/java/org/eclipse/hono/client/impl/ProtocolAdapterCommandConsumerFactoryImpl.java b/client/src/main/java/org/eclipse/hono/client/impl/ProtocolAdapterCommandConsumerFactoryImpl.java index eda0cd53b7..ad503f0b3d 100644 --- a/client/src/main/java/org/eclipse/hono/client/impl/ProtocolAdapterCommandConsumerFactoryImpl.java +++ b/client/src/main/java/org/eclipse/hono/client/impl/ProtocolAdapterCommandConsumerFactoryImpl.java @@ -201,30 +201,30 @@ private Future removeCommandConsumer(final String tenantId, final String d log.trace("remove command consumer [tenant-id: {}, device-id: {}]", tenantId, deviceId); adapterInstanceCommandHandler.removeDeviceSpecificCommandHandler(tenantId, deviceId); - final boolean lifespanReached = !lifespan.isNegative() && Instant.now().isAfter(lifespanStart.plus(lifespan)); - // TODO when making handling of the lifespan property mandatory for implementors of the Device Connection API, - // removing the adapter instance can be skipped here if 'lifespanReached' is true - return deviceConnectionClientFactory.getOrCreateDeviceConnectionClient(tenantId) - .compose(client -> { - // The given span context from the createCommandConsumer invocation is only used here if a non-unlimited lifespan is set - // meaning creating and removing the consumer will probably happen in a timespan short enough for one overall trace. - // The span context isn't used however if lifespanReached is true since the below 'remove' operation will usually (but not necessarily) - // result in a "not found" error in that case, which is expected and should not cause the overall trace to be marked with an error. - final SpanContext context = !lifespan.isNegative() && !lifespanReached ? createCommandConsumerSpanContext : null; - return client.removeCommandHandlingAdapterInstance(deviceId, adapterInstanceId, context); - }).recover(thr -> { - if (lifespanReached && thr instanceof ServiceInvocationException - && ((ServiceInvocationException) thr).getErrorCode() == HttpURLConnection.HTTP_NOT_FOUND) { - // entry was not found, meaning it has expired - log.trace("ignoring 404 error when removing command handling adapter instance; entry has already expired [tenant: {}, device: {}]", - tenantId, deviceId); - return Future.succeededFuture(); - } else { - log.info("error removing command handling adapter instance [tenant: {}, device: {}]", tenantId, - deviceId, thr); - return Future.failedFuture(thr); - } - }); + if (!lifespan.isNegative() && Instant.now().isAfter(lifespanStart.plus(lifespan))) { + // command handling adapter instance entry already has expired, no explicit removal necessary + return Future.succeededFuture(); + } else { + return deviceConnectionClientFactory.getOrCreateDeviceConnectionClient(tenantId) + .compose(client -> { + // The given span context from the createCommandConsumer invocation is only used here if a non-unlimited lifespan is set + // meaning creating and removing the consumer will probably happen in a timespan short enough for one overall trace. + final SpanContext context = !lifespan.isNegative() ? createCommandConsumerSpanContext : null; + return client.removeCommandHandlingAdapterInstance(deviceId, adapterInstanceId, context); + }).recover(thr -> { + if (thr instanceof ServiceInvocationException + && ((ServiceInvocationException) thr).getErrorCode() == HttpURLConnection.HTTP_NOT_FOUND) { + // entry was not found, it could have expired in the meantime or could have been updated with another adapter instance + log.debug("ignoring 404 error when removing command handling adapter instance [tenant: {}, device: {}]", + tenantId, deviceId); + return Future.succeededFuture(); + } else { + log.error("error removing command handling adapter instance [tenant: {}, device: {}]", tenantId, + deviceId, thr); + return Future.failedFuture(thr); + } + }); + } } private Future getOrCreateMappingAndDelegatingCommandConsumer(final String tenantId) { diff --git a/client/src/test/java/org/eclipse/hono/client/impl/ProtocolAdapterCommandConsumerFactoryImplTest.java b/client/src/test/java/org/eclipse/hono/client/impl/ProtocolAdapterCommandConsumerFactoryImplTest.java index f1adb6787a..4c72cd7bda 100644 --- a/client/src/test/java/org/eclipse/hono/client/impl/ProtocolAdapterCommandConsumerFactoryImplTest.java +++ b/client/src/test/java/org/eclipse/hono/client/impl/ProtocolAdapterCommandConsumerFactoryImplTest.java @@ -22,6 +22,7 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -222,4 +223,39 @@ public void testCreateTimeLimitedCommandConsumerSucceeds(final VertxTestContext })); } + /** + * Verifies that closing an already expired command consumer skips removal of the + * mapping entry in the device connection service. + * + * @param vertx The vert.x instance. + * @param ctx The test context. + */ + @Test + public void testCloseExpiredCommandConsumer(final Vertx vertx, final VertxTestContext ctx) { + + final Handler commandHandler = VertxMockSupport.mockHandler(); + + final Duration lifespan = Duration.ofMillis(1); + final Future commandConsumerFuture = commandConsumerFactory.createCommandConsumer(tenantId, + deviceId, commandHandler, lifespan, null); + commandConsumerFuture.setHandler(ctx.succeeding(consumer -> { + ctx.verify(() -> { + verify(connection).createReceiver(eq(tenantCommandAddress), eq(ProtonQoS.AT_LEAST_ONCE), any(), anyInt(), + eq(false), any()); + verify(devConClient).setCommandHandlingAdapterInstance(eq(deviceId), anyString(), any(), any()); + // wait until lifespan period has elapsed + vertx.setTimer(10, tid -> { + // verify closing the consumer is successful + consumer.close(ctx.succeeding(v -> { + ctx.verify(() -> { + // verify command handling adapter instance isn't explicitly removed (since lifespan has already elapsed) + verify(devConClient, never()).removeCommandHandlingAdapterInstance(eq(deviceId), anyString(), any()); + }); + ctx.completeNow(); + })); + }); + }); + })); + } + }