Skip to content

Commit

Permalink
Use void future in client removeCommandHandlingAdapterInstance().
Browse files Browse the repository at this point in the history
This reverts #1926 and makes the method behave according to
the API spec again, returning a failed Future if the entry
wasn't removed.

Signed-off-by: Carsten Lohmann <[email protected]>
  • Loading branch information
calohmn committed Oct 20, 2020
1 parent e2f849a commit f2cae32
Show file tree
Hide file tree
Showing 13 changed files with 106 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,14 @@ public Future<Void> setCommandHandlingAdapterInstance(final String deviceId, fin
}

@Override
public Future<Boolean> removeCommandHandlingAdapterInstance(final String deviceId, final String adapterInstanceId,
public Future<Void> removeCommandHandlingAdapterInstance(final String deviceId, final String adapterInstanceId,
final SpanContext context) {
final Span span = newSpan(context, SPAN_NAME_REMOVE_CMD_HANDLING_ADAPTER_INSTANCE);
TracingHelper.setDeviceTags(span, tenantId, deviceId);
span.setTag(MessageHelper.APP_PROPERTY_ADAPTER_INSTANCE_ID, adapterInstanceId);
return finishSpan(cache.removeCommandHandlingAdapterInstance(tenantId, deviceId, adapterInstanceId, span), span);
// skip tracing PRECON_FAILED response status as error (may have been trying to remove an expired entry)
return finishSpan(cache.removeCommandHandlingAdapterInstance(tenantId, deviceId, adapterInstanceId, span), span,
HttpURLConnection.HTTP_PRECON_FAILED);
}

@Override
Expand All @@ -170,13 +172,20 @@ private Span newSpan(final SpanContext parent, final String operationName) {
}

private <T> Future<T> finishSpan(final Future<T> result, final Span span) {
return finishSpan(result, span, null);
}

private <T> Future<T> finishSpan(final Future<T> result, final Span span, final Integer statusToSkipErrorTraceFor) {
return result.recover(t -> {
Tags.HTTP_STATUS.set(span, ServiceInvocationException.extractStatusCode(t));
TracingHelper.logError(span, t);
final int statusCode = ServiceInvocationException.extractStatusCode(t);
Tags.HTTP_STATUS.set(span, statusCode);
if (statusToSkipErrorTraceFor == null || statusCode != statusToSkipErrorTraceFor) {
TracingHelper.logError(span, t);
}
span.finish();
return Future.failedFuture(t);
}).map(resultValue -> {
Tags.HTTP_STATUS.set(span, resultValue != null ? HttpURLConnection.HTTP_OK : HttpURLConnection.HTTP_ACCEPTED);
Tags.HTTP_STATUS.set(span, resultValue != null ? HttpURLConnection.HTTP_OK : HttpURLConnection.HTTP_NO_CONTENT);
span.finish();
return resultValue;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public Future<Void> setCommandHandlingAdapterInstance(final String tenantId, fin
}

@Override
public Future<Boolean> removeCommandHandlingAdapterInstance(final String tenantId, final String deviceId,
public Future<Void> removeCommandHandlingAdapterInstance(final String tenantId, final String deviceId,
final String adapterInstanceId, final Span span) {
Objects.requireNonNull(tenantId);
Objects.requireNonNull(deviceId);
Expand All @@ -182,15 +182,16 @@ public Future<Boolean> removeCommandHandlingAdapterInstance(final String tenantI
tenantId, deviceId, adapterInstanceId, t);
return Future.failedFuture(new ServerErrorException(HttpURLConnection.HTTP_INTERNAL_ERROR, t));
})
.map(removed -> {
.compose(removed -> {
if (!removed) {
LOG.debug("command handling adapter instance was not removed, key not mapped or value didn't match [tenant: {}, device-id: {}, adapter-instance: {}]",
tenantId, deviceId, adapterInstanceId);
return Future.failedFuture(new ClientErrorException(HttpURLConnection.HTTP_PRECON_FAILED));
} else {
LOG.debug("removed command handling adapter instance [tenant: {}, device-id: {}, adapter-instance: {}]",
tenantId, deviceId, adapterInstanceId);
return Future.succeededFuture();
}
return removed;
});

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,13 @@ Future<Void> setCommandHandlingAdapterInstance(String tenantId, String deviceId,
* @param span The active OpenTracing span for this operation. It is not to be closed in this method!
* An implementation should log (error) events on this span and it may set tags and use this span as the
* parent for any spans created in this method.
* @return A future indicating the outcome of the operation, with its value indicating whether the protocol
* adapter instance value was removed or not.
* @return A future indicating the outcome of the operation.
* <p>
* The future will be failed with a {@link org.eclipse.hono.client.ServiceInvocationException} if there
* was an error removing the value.
* @throws NullPointerException if any of the parameters is {@code null}.
* The future will be succeeded if the entry was successfully removed.
* Otherwise the future will be failed with a {@link org.eclipse.hono.client.ServiceInvocationException}.
* @throws NullPointerException if any of the parameters except context is {@code null}.
*/
Future<Boolean> removeCommandHandlingAdapterInstance(String tenantId, String deviceId, String adapterInstanceId, Span span);
Future<Void> removeCommandHandlingAdapterInstance(String tenantId, String deviceId, String adapterInstanceId, Span span);

/**
* Gets information about the adapter instances that can handle a command for the given device.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,14 +256,11 @@ public void testRemoveCommandHandlingAdapterInstanceSucceeds(final VertxTestCont
info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, span)
.compose(v -> info.removeCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId,
adapterInstance, span))
.onComplete(ctx.succeeding(result -> ctx.verify(() -> {
assertThat(result).isTrue();
ctx.completeNow();
})));
.onComplete(ctx.completing());
}

/**
* Verifies that the <em>removeCommandHandlingAdapterInstance</em> operation result is <em>false</em> if
* Verifies that the <em>removeCommandHandlingAdapterInstance</em> operation fails with a PRECON_FAILED status if
* no entry was registered for the device. Only an adapter instance for another device of the tenant was
* registered.
*
Expand All @@ -276,15 +273,16 @@ public void testRemoveCommandHandlingAdapterInstanceFailsForOtherDevice(final Ve
info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, span)
.compose(v -> {
return info.removeCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, "otherDevice", adapterInstance, span);
}).onComplete(ctx.succeeding(result -> ctx.verify(() -> {
assertThat(result).isFalse();
}).onComplete(ctx.failing(t -> ctx.verify(() -> {
assertThat(t).isInstanceOf(ServiceInvocationException.class);
assertThat(((ServiceInvocationException) t).getErrorCode()).isEqualTo(HttpURLConnection.HTTP_PRECON_FAILED);
ctx.completeNow();
})));
}

/**
* Verifies that the <em>removeCommandHandlingAdapterInstance</em> operation result is <em>false</em> if
* the given adapter instance parameter doesn't match the one of the entry registered for the given device.
* Verifies that the <em>removeCommandHandlingAdapterInstance</em> operation fails with a PRECON_FAILED status
* if the given adapter instance parameter doesn't match the one of the entry registered for the given device.
*
* @param ctx The vert.x context.
*/
Expand All @@ -295,8 +293,9 @@ public void testRemoveCommandHandlingAdapterInstanceFailsForOtherAdapterInstance
info.setCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, adapterInstance, null, span)
.compose(v -> {
return info.removeCommandHandlingAdapterInstance(Constants.DEFAULT_TENANT, deviceId, "otherAdapterInstance", span);
}).onComplete(ctx.succeeding(result -> ctx.verify(() -> {
assertThat(result).isFalse();
}).onComplete(ctx.failing(t -> ctx.verify(() -> {
assertThat(t).isInstanceOf(ServiceInvocationException.class);
assertThat(((ServiceInvocationException) t).getErrorCode()).isEqualTo(HttpURLConnection.HTTP_PRECON_FAILED);
ctx.completeNow();
})));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,13 @@ Future<Void> setCommandHandlingAdapterInstance(String deviceId, String adapterIn
* @param context The currently active OpenTracing span context or {@code null} if no span is currently active.
* An implementation should use this as the parent for any span it creates for tracing
* the execution of this operation.
* @return A future indicating the outcome of the operation, with its value indicating whether the protocol
* adapter instance value was removed or not.
* <p>
* NOTE: this method maps an outcome with status 404 or 412 as defined in the
* <a href="https://www.eclipse.org/hono/docs/api/device-connection/">Device Connection API
* specification</a> to a succeeded future with value {@code false} here.
* @return A future indicating the outcome of the operation.
* <p>
* The future will be failed with a {@link org.eclipse.hono.client.ServiceInvocationException} if there
* was an error removing the value.
* The future will be succeeded if the entry was successfully removed.
* Otherwise the future will be failed with a {@link org.eclipse.hono.client.ServiceInvocationException}.
* @throws NullPointerException if device id or adapter instance id is {@code null}.
*/
Future<Boolean> removeCommandHandlingAdapterInstance(String deviceId, String adapterInstanceId, SpanContext context);
Future<Void> removeCommandHandlingAdapterInstance(String deviceId, String adapterInstanceId, SpanContext context);

/**
* Gets information about the adapter instances that can handle a command for the given device.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.eclipse.hono.client.DeviceConnectionClient;
import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.client.SendMessageSampler;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.client.StatusCodeMapper;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.CacheDirective;
Expand All @@ -37,6 +38,7 @@

import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.tag.Tags;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
Expand Down Expand Up @@ -242,7 +244,7 @@ public Future<JsonObject> getLastKnownGatewayForDevice(final String deviceId, fi
}

@Override
public Future<Boolean> removeCommandHandlingAdapterInstance(final String deviceId, final String adapterInstanceId, final SpanContext context) {
public Future<Void> removeCommandHandlingAdapterInstance(final String deviceId, final String adapterInstanceId, final SpanContext context) {
Objects.requireNonNull(deviceId);
Objects.requireNonNull(adapterInstanceId);

Expand All @@ -261,17 +263,26 @@ public Future<Boolean> removeCommandHandlingAdapterInstance(final String deviceI
resultTracker,
null,
currentSpan);
return mapResultAndFinishSpan(resultTracker.future(), result -> {
switch (result.getStatus()) {
case HttpURLConnection.HTTP_NO_CONTENT:
return Boolean.TRUE;
case HttpURLConnection.HTTP_NOT_FOUND:
case HttpURLConnection.HTTP_PRECON_FAILED:
return Boolean.FALSE;
default:
throw StatusCodeMapper.from(result);
// not using mapResultAndFinishSpan() in order to skip logging PRECON_FAILED result as error (may have been trying to remove an expired entry)
return resultTracker.future().recover(t -> {
Tags.HTTP_STATUS.set(currentSpan, ServiceInvocationException.extractStatusCode(t));
TracingHelper.logError(currentSpan, t);
currentSpan.finish();
return Future.failedFuture(t);
}).map(resultValue -> {
Tags.HTTP_STATUS.set(currentSpan, resultValue.getStatus());
if (resultValue.isError() && resultValue.getStatus() != HttpURLConnection.HTTP_PRECON_FAILED) {
Tags.ERROR.set(currentSpan, Boolean.TRUE);
}
}, currentSpan);
currentSpan.finish();

switch (resultValue.getStatus()) {
case HttpURLConnection.HTTP_NO_CONTENT:
return null;
default:
throw StatusCodeMapper.from(resultValue);
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.eclipse.hono.client.ProtocolAdapterCommandConsumerFactory;
import org.eclipse.hono.client.SendMessageSampler;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.util.AddressHelper;
import org.eclipse.hono.util.CommandConstants;
import org.eclipse.hono.util.Constants;
Expand Down Expand Up @@ -228,26 +229,30 @@ private Future<Void> removeCommandConsumer(final CommandHandlerWrapper commandHa
.compose(client -> client.removeCommandHandlingAdapterInstance(deviceId, adapterInstanceId,
onCloseSpanContext))
.recover(thr -> {
log.warn("error removing command handling adapter instance [tenant: {}, device: {}]", tenantId,
deviceId, thr);
return Future.failedFuture(thr);
})
.compose(removed -> {
final boolean entryNotExpired = lifespan.isNegative() || Instant.now().isBefore(lifespanStart.plus(lifespan));
if (!removed && entryNotExpired) {
// entry wasn't actually removed and entry hasn't expired (yet);
// This case happens when 2 consecutive command subscription requests from the same device
// (with no intermittent disconnect/unsubscribe - possibly because of a broken connection in between)
// have reached *different* protocol adapter instances/verticles. Now calling 'removeCommandHandlingAdapterInstance'
// on the 1st subscription fails because of the non-matching adapterInstanceId parameter.
// Throwing an explicit exception here will enable the protocol adapter to detect this case
// and skip sending an (incorrect) "disconnectedTtd" event message.
log.debug("command handling adapter instance not removed - not matched or already removed [tenant: {}, device: {}]",
tenantId, deviceId);
return Future.failedFuture(new ClientErrorException(HttpURLConnection.HTTP_PRECON_FAILED,
"no matching command consumer mapping found to be removed"));
if (ServiceInvocationException.extractStatusCode(thr) == HttpURLConnection.HTTP_PRECON_FAILED) {
final boolean entryMayHaveExpired = !lifespan.isNegative() && Instant.now().isAfter(lifespanStart.plus(lifespan));
if (entryMayHaveExpired) {
log.trace("ignoring 412 error when removing command handling adapter instance; entry may have already expired [tenant: {}, device: {}]",
tenantId, deviceId);
return Future.succeededFuture();
} else {
// entry wasn't actually removed and entry hasn't expired (yet);
// This case happens when 2 consecutive command subscription requests from the same device
// (with no intermittent disconnect/unsubscribe - possibly because of a broken connection in between)
// have reached *different* protocol adapter instances/verticles. Now calling 'removeCommandHandlingAdapterInstance'
// on the 1st subscription fails because of the non-matching adapterInstanceId parameter.
// Throwing an explicit exception here will enable the protocol adapter to detect this case
// and skip sending an (incorrect) "disconnectedTtd" event message.
log.debug("command handling adapter instance not removed - not matched or already removed [tenant: {}, device: {}]",
tenantId, deviceId);
return Future.failedFuture(new ClientErrorException(HttpURLConnection.HTTP_PRECON_FAILED,
"no matching command consumer mapping found to be removed"));
}
} else {
log.info("error removing command handling adapter instance [tenant: {}, device: {}]", tenantId,
deviceId, thr);
return Future.failedFuture(thr);
}
return Future.succeededFuture((Void) null);
});
}

Expand Down
Loading

0 comments on commit f2cae32

Please sign in to comment.