Skip to content

Commit

Permalink
[eclipse-hono#1858] Skip removeCommandHandlingAdapterInstance if expi…
Browse files Browse the repository at this point in the history
…red.

With the 'lifespan' parameter of the 'setCommandHandlingAdapterInstance'
operation being mandatory to implement now for Device Connection
API implementations, the explicit removal of a command handling
adapter instance entry can be skipped if the entry has expired.

Signed-off-by: Carsten Lohmann <[email protected]>
  • Loading branch information
calohmn committed Apr 23, 2020
1 parent d99f19e commit 2ce6e07
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -201,30 +201,30 @@ private Future<Void> removeCommandConsumer(final String tenantId, final String d
log.trace("remove command consumer [tenant-id: {}, device-id: {}]", tenantId, deviceId);
adapterInstanceCommandHandler.removeDeviceSpecificCommandHandler(tenantId, deviceId);

final boolean lifespanReached = !lifespan.isNegative() && Instant.now().isAfter(lifespanStart.plus(lifespan));
// TODO when making handling of the lifespan property mandatory for implementors of the Device Connection API,
// removing the adapter instance can be skipped here if 'lifespanReached' is true
return deviceConnectionClientFactory.getOrCreateDeviceConnectionClient(tenantId)
.compose(client -> {
// The given span context from the createCommandConsumer invocation is only used here if a non-unlimited lifespan is set
// meaning creating and removing the consumer will probably happen in a timespan short enough for one overall trace.
// The span context isn't used however if lifespanReached is true since the below 'remove' operation will usually (but not necessarily)
// result in a "not found" error in that case, which is expected and should not cause the overall trace to be marked with an error.
final SpanContext context = !lifespan.isNegative() && !lifespanReached ? createCommandConsumerSpanContext : null;
return client.removeCommandHandlingAdapterInstance(deviceId, adapterInstanceId, context);
}).recover(thr -> {
if (lifespanReached && thr instanceof ServiceInvocationException
&& ((ServiceInvocationException) thr).getErrorCode() == HttpURLConnection.HTTP_NOT_FOUND) {
// entry was not found, meaning it has expired
log.trace("ignoring 404 error when removing command handling adapter instance; entry has already expired [tenant: {}, device: {}]",
tenantId, deviceId);
return Future.succeededFuture();
} else {
log.info("error removing command handling adapter instance [tenant: {}, device: {}]", tenantId,
deviceId, thr);
return Future.failedFuture(thr);
}
});
if (!lifespan.isNegative() && Instant.now().isAfter(lifespanStart.plus(lifespan))) {
// command handling adapter instance entry already has expired, no explicit removal necessary
return Future.succeededFuture();
} else {
return deviceConnectionClientFactory.getOrCreateDeviceConnectionClient(tenantId)
.compose(client -> {
// The given span context from the createCommandConsumer invocation is only used here if a non-unlimited lifespan is set
// meaning creating and removing the consumer will probably happen in a timespan short enough for one overall trace.
final SpanContext context = !lifespan.isNegative() ? createCommandConsumerSpanContext : null;
return client.removeCommandHandlingAdapterInstance(deviceId, adapterInstanceId, context);
}).recover(thr -> {
if (thr instanceof ServiceInvocationException
&& ((ServiceInvocationException) thr).getErrorCode() == HttpURLConnection.HTTP_NOT_FOUND) {
// entry was not found, it could have expired in the meantime or could have been updated with another adapter instance
log.debug("ignoring 404 error when removing command handling adapter instance [tenant: {}, device: {}]",
tenantId, deviceId);
return Future.succeededFuture();
} else {
log.error("error removing command handling adapter instance [tenant: {}, device: {}]", tenantId,
deviceId, thr);
return Future.failedFuture(thr);
}
});
}
}

private Future<MessageConsumer> getOrCreateMappingAndDelegatingCommandConsumer(final String tenantId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -222,4 +223,39 @@ public void testCreateTimeLimitedCommandConsumerSucceeds(final VertxTestContext
}));
}

/**
* Verifies that closing an already expired command consumer skips removal of the
* mapping entry in the device connection service.
*
* @param vertx The vert.x instance.
* @param ctx The test context.
*/
@Test
public void testCloseExpiredCommandConsumer(final Vertx vertx, final VertxTestContext ctx) {

final Handler<CommandContext> commandHandler = VertxMockSupport.mockHandler();

final Duration lifespan = Duration.ofMillis(1);
final Future<MessageConsumer> commandConsumerFuture = commandConsumerFactory.createCommandConsumer(tenantId,
deviceId, commandHandler, lifespan, null);
commandConsumerFuture.setHandler(ctx.succeeding(consumer -> {
ctx.verify(() -> {
verify(connection).createReceiver(eq(tenantCommandAddress), eq(ProtonQoS.AT_LEAST_ONCE), any(), anyInt(),
eq(false), any());
verify(devConClient).setCommandHandlingAdapterInstance(eq(deviceId), anyString(), any(), any());
// wait until lifespan period has elapsed
vertx.setTimer(10, tid -> {
// verify closing the consumer is successful
consumer.close(ctx.succeeding(v -> {
ctx.verify(() -> {
// verify command handling adapter instance isn't explicitly removed (since lifespan has already elapsed)
verify(devConClient, never()).removeCommandHandlingAdapterInstance(eq(deviceId), anyString(), any());
});
ctx.completeNow();
}));
});
});
}));
}

}

0 comments on commit 2ce6e07

Please sign in to comment.