Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1858] Skip removeCommandHandlingAdapterInstance if entry has expired #1922

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -201,30 +201,30 @@ private Future<Void> removeCommandConsumer(final String tenantId, final String d
log.trace("remove command consumer [tenant-id: {}, device-id: {}]", tenantId, deviceId);
adapterInstanceCommandHandler.removeDeviceSpecificCommandHandler(tenantId, deviceId);

final boolean lifespanReached = !lifespan.isNegative() && Instant.now().isAfter(lifespanStart.plus(lifespan));
// TODO when making handling of the lifespan property mandatory for implementors of the Device Connection API,
// removing the adapter instance can be skipped here if 'lifespanReached' is true
return deviceConnectionClientFactory.getOrCreateDeviceConnectionClient(tenantId)
.compose(client -> {
// The given span context from the createCommandConsumer invocation is only used here if a non-unlimited lifespan is set
// meaning creating and removing the consumer will probably happen in a timespan short enough for one overall trace.
// The span context isn't used however if lifespanReached is true since the below 'remove' operation will usually (but not necessarily)
// result in a "not found" error in that case, which is expected and should not cause the overall trace to be marked with an error.
final SpanContext context = !lifespan.isNegative() && !lifespanReached ? createCommandConsumerSpanContext : null;
return client.removeCommandHandlingAdapterInstance(deviceId, adapterInstanceId, context);
}).recover(thr -> {
if (lifespanReached && thr instanceof ServiceInvocationException
&& ((ServiceInvocationException) thr).getErrorCode() == HttpURLConnection.HTTP_NOT_FOUND) {
// entry was not found, meaning it has expired
log.trace("ignoring 404 error when removing command handling adapter instance; entry has already expired [tenant: {}, device: {}]",
tenantId, deviceId);
return Future.succeededFuture();
} else {
log.info("error removing command handling adapter instance [tenant: {}, device: {}]", tenantId,
deviceId, thr);
return Future.failedFuture(thr);
}
});
if (!lifespan.isNegative() && Instant.now().isAfter(lifespanStart.plus(lifespan))) {
// command handling adapter instance entry already has expired, no explicit removal necessary
return Future.succeededFuture();
} else {
return deviceConnectionClientFactory.getOrCreateDeviceConnectionClient(tenantId)
.compose(client -> {
// The given span context from the createCommandConsumer invocation is only used here if a non-unlimited lifespan is set
// meaning creating and removing the consumer will probably happen in a timespan short enough for one overall trace.
final SpanContext context = !lifespan.isNegative() ? createCommandConsumerSpanContext : null;
return client.removeCommandHandlingAdapterInstance(deviceId, adapterInstanceId, context);
}).recover(thr -> {
if (thr instanceof ServiceInvocationException
&& ((ServiceInvocationException) thr).getErrorCode() == HttpURLConnection.HTTP_NOT_FOUND) {
// entry was not found, it could have expired in the meantime or could have been updated with another adapter instance
log.debug("ignoring 404 error when removing command handling adapter instance [tenant: {}, device: {}]",
tenantId, deviceId);
return Future.succeededFuture();
} else {
log.error("error removing command handling adapter instance [tenant: {}, device: {}]", tenantId,
deviceId, thr);
return Future.failedFuture(thr);
}
});
}
}

private Future<MessageConsumer> getOrCreateMappingAndDelegatingCommandConsumer(final String tenantId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -222,4 +223,39 @@ public void testCreateTimeLimitedCommandConsumerSucceeds(final VertxTestContext
}));
}

/**
* Verifies that closing an already expired command consumer skips removal of the
* mapping entry in the device connection service.
*
* @param vertx The vert.x instance.
* @param ctx The test context.
*/
@Test
public void testCloseExpiredCommandConsumer(final Vertx vertx, final VertxTestContext ctx) {

final Handler<CommandContext> commandHandler = VertxMockSupport.mockHandler();

final Duration lifespan = Duration.ofMillis(1);
final Future<MessageConsumer> commandConsumerFuture = commandConsumerFactory.createCommandConsumer(tenantId,
deviceId, commandHandler, lifespan, null);
commandConsumerFuture.setHandler(ctx.succeeding(consumer -> {
ctx.verify(() -> {
verify(connection).createReceiver(eq(tenantCommandAddress), eq(ProtonQoS.AT_LEAST_ONCE), any(), anyInt(),
eq(false), any());
verify(devConClient).setCommandHandlingAdapterInstance(eq(deviceId), anyString(), any(), any());
// wait until lifespan period has elapsed
vertx.setTimer(10, tid -> {
// verify closing the consumer is successful
consumer.close(ctx.succeeding(v -> {
ctx.verify(() -> {
// verify command handling adapter instance isn't explicitly removed (since lifespan has already elapsed)
verify(devConClient, never()).removeCommandHandlingAdapterInstance(eq(deviceId), anyString(), any());
});
ctx.completeNow();
}));
});
});
}));
}

}