Skip to content

Commit

Permalink
[eclipse-hono#1858] Use ttd as command consumer lifespan.
Browse files Browse the repository at this point in the history
Signed-off-by: Carsten Lohmann <[email protected]>
  • Loading branch information
calohmn committed Apr 22, 2020
1 parent 3dca60d commit 27fca73
Show file tree
Hide file tree
Showing 14 changed files with 117 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -864,10 +864,10 @@ private Future<MessageConsumer> createCommandConsumer(
if (authenticatedDevice != null && !authenticatedDevice.getDeviceId().equals(sourceAddress.getResourceId())) {
// gateway scenario
return getCommandConsumerFactory().createCommandConsumer(sourceAddress.getTenantId(),
sourceAddress.getResourceId(), authenticatedDevice.getDeviceId(), commandHandler, span.context());
sourceAddress.getResourceId(), authenticatedDevice.getDeviceId(), commandHandler, -1, span.context());
} else {
return getCommandConsumerFactory().createCommandConsumer(sourceAddress.getTenantId(),
sourceAddress.getResourceId(), commandHandler, span.context());
sourceAddress.getResourceId(), commandHandler, -1, span.context());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.argThat;
Expand Down Expand Up @@ -455,7 +456,7 @@ public void testAdapterOpensSenderLinkAndNotifyDownstreamApplication() {
// WHEN an unauthenticated device opens a receiver link with a valid source address
final ProtonConnection deviceConnection = mock(ProtonConnection.class);
when(deviceConnection.attachments()).thenReturn(mock(Record.class));
when(commandConsumerFactory.createCommandConsumer(eq(TEST_TENANT_ID), eq(TEST_DEVICE), any(Handler.class), any()))
when(commandConsumerFactory.createCommandConsumer(eq(TEST_TENANT_ID), eq(TEST_DEVICE), any(Handler.class), anyInt(), any()))
.thenReturn(Future.succeededFuture(mock(MessageConsumer.class)));
final String sourceAddress = String.format("%s/%s/%s", getCommandEndpoint(), TEST_TENANT_ID, TEST_DEVICE);
final ProtonSender sender = getSender(sourceAddress);
Expand Down Expand Up @@ -488,7 +489,7 @@ public void testAdapterClosesCommandConsumerWhenDeviceClosesReceiverLink() {

// and a device that wants to receive commands
final MessageConsumer commandConsumer = mock(MessageConsumer.class);
when(commandConsumerFactory.createCommandConsumer(eq(TEST_TENANT_ID), eq(TEST_DEVICE), any(Handler.class), any()))
when(commandConsumerFactory.createCommandConsumer(eq(TEST_TENANT_ID), eq(TEST_DEVICE), any(Handler.class), anyInt(), any()))
.thenReturn(Future.succeededFuture(commandConsumer));
final String sourceAddress = String.format("%s", getCommandEndpoint());
final ProtonSender sender = getSender(sourceAddress);
Expand Down Expand Up @@ -588,7 +589,7 @@ private void testAdapterClosesCommandConsumer(

// that wants to receive commands
final MessageConsumer commandConsumer = mock(MessageConsumer.class);
when(commandConsumerFactory.createCommandConsumer(eq(TEST_TENANT_ID), eq(TEST_DEVICE), any(Handler.class), any()))
when(commandConsumerFactory.createCommandConsumer(eq(TEST_TENANT_ID), eq(TEST_DEVICE), any(Handler.class), anyInt(), any()))
.thenReturn(Future.succeededFuture(commandConsumer));
final String sourceAddress = getCommandEndpoint();
final ProtonSender sender = getSender(sourceAddress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -983,12 +983,14 @@ protected final Future<MessageConsumer> createCommandConsumer(
deviceId,
gatewayId,
commandHandler,
ttdSecs,
currentSpan.context());
} else {
commandConsumerFuture = getCommandConsumerFactory().createCommandConsumer(
tenantObject.getTenantId(),
deviceId,
commandHandler,
ttdSecs,
currentSpan.context());
}
return commandConsumerFuture
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ public void setup() {
commandConsumer = mock(MessageConsumer.class);
commandConsumerFactory = mock(ProtocolAdapterCommandConsumerFactory.class);
when(commandConsumerFactory.connect()).thenReturn(Future.succeededFuture(mock(HonoConnection.class)));
when(commandConsumerFactory.createCommandConsumer(anyString(), anyString(), any(Handler.class), any()))
when(commandConsumerFactory.createCommandConsumer(anyString(), anyString(), any(Handler.class), anyInt(), any()))
.thenReturn(Future.succeededFuture(commandConsumer));
when(commandConsumerFactory.createCommandConsumer(anyString(), anyString(), anyString(), any(Handler.class), any()))
when(commandConsumerFactory.createCommandConsumer(anyString(), anyString(), anyString(), any(Handler.class), anyInt(), any()))
.thenReturn(Future.succeededFuture(commandConsumer));

deviceConnectionClientFactory = mock(DeviceConnectionClientFactory.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1057,12 +1057,14 @@ protected final Future<MessageConsumer> createCommandConsumer(
deviceId,
gatewayId,
commandHandler,
ttdSecs,
currentSpan.context());
} else {
commandConsumerFuture = getCommandConsumerFactory().createCommandConsumer(
tenantObject.getTenantId(),
deviceId,
commandHandler,
ttdSecs,
currentSpan.context());
}
return commandConsumerFuture
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public void setup() {
}).when(commandConsumer).close(any(Handler.class));
commandConsumerFactory = mock(ProtocolAdapterCommandConsumerFactory.class);
when(commandConsumerFactory.connect()).thenReturn(Future.succeededFuture(mock(HonoConnection.class)));
when(commandConsumerFactory.createCommandConsumer(anyString(), anyString(), any(Handler.class), any()))
when(commandConsumerFactory.createCommandConsumer(anyString(), anyString(), any(Handler.class), anyInt(), any()))
.thenReturn(Future.succeededFuture(commandConsumer));

commandTargetMapper = mock(CommandTargetMapper.class);
Expand Down Expand Up @@ -284,7 +284,7 @@ public void testUploadTelemetryFailsForDisabledTenant() {
// THEN the device gets a 403
assertContextFailedWithClientError(ctx, HttpURLConnection.HTTP_FORBIDDEN);
// and no Command consumer has been created for the device
verify(commandConsumerFactory, never()).createCommandConsumer(anyString(), anyString(), any(Handler.class), any());
verify(commandConsumerFactory, never()).createCommandConsumer(anyString(), anyString(), any(Handler.class), anyInt(), any());
// and the message has not been forwarded downstream
verify(sender, never()).send(any(Message.class));
// and has not been reported as processed
Expand Down Expand Up @@ -337,7 +337,7 @@ public void testUploadTelemetryFailsForUnknownDevice() {
// and the message has not been forwarded downstream
verify(sender, never()).send(any(Message.class), any(SpanContext.class));
// and no Command consumer has been created for the device
verify(commandConsumerFactory, never()).createCommandConsumer(anyString(), anyString(), any(Handler.class), any());
verify(commandConsumerFactory, never()).createCommandConsumer(anyString(), anyString(), any(Handler.class), anyInt(), any());
// and has not been reported as processed
verify(metrics, never())
.reportTelemetry(
Expand Down Expand Up @@ -742,7 +742,7 @@ public void testUploadTelemetryWithTtdClosesCommandConsumerIfSenderCreationFaile
new ServerErrorException(HttpURLConnection.HTTP_UNAVAILABLE, "not connected")));
// and the creation of the command consumer completes at a later point
final Promise<MessageConsumer> commandConsumerPromise = Promise.promise();
when(commandConsumerFactory.createCommandConsumer(anyString(), anyString(), any(Handler.class), any()))
when(commandConsumerFactory.createCommandConsumer(anyString(), anyString(), any(Handler.class), anyInt(), any()))
.thenReturn(commandConsumerPromise.future());

adapter.uploadTelemetryMessage(ctx, "tenant", "device");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package org.eclipse.hono.adapter.http.impl;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
Expand Down Expand Up @@ -237,7 +238,7 @@ public void setUp(final TestInfo testInfo) {
}
return null;
}).when(commandConsumer).close(any(Handler.class));
when(commandConsumerFactory.createCommandConsumer(anyString(), anyString(), any(Handler.class), any())).
when(commandConsumerFactory.createCommandConsumer(anyString(), anyString(), any(Handler.class), anyInt(), any())).
thenReturn(Future.succeededFuture(commandConsumer));

telemetrySender = mock(DownstreamSender.class);
Expand Down Expand Up @@ -490,7 +491,7 @@ public void testPostTelemetryWithTtdSucceedsWithCommandInResponse(final VertxTes
final Command pendingCommand = Command.from(msg, "DEFAULT_TENANT", "device_1");
final CommandContext commandContext = CommandContext.from(pendingCommand, mock(ProtonDelivery.class), mock(Span.class));
final MessageConsumer commandConsumer = mock(MessageConsumer.class);
when(commandConsumerFactory.createCommandConsumer(eq("DEFAULT_TENANT"), eq("device_1"), any(Handler.class), any())).
when(commandConsumerFactory.createCommandConsumer(eq("DEFAULT_TENANT"), eq("device_1"), any(Handler.class), anyInt(), any())).
thenAnswer(invocation -> {
final Handler<CommandContext> consumer = invocation.getArgument(2);
consumer.handle(commandContext);
Expand Down Expand Up @@ -518,7 +519,7 @@ public void testPostTelemetryWithTtdSucceedsWithCommandInResponse(final VertxTes
.sendJsonObject(new JsonObject(), ctx.succeeding(r -> {
ctx.verify(() -> {
verify(commandConsumerFactory).createCommandConsumer(eq("DEFAULT_TENANT"), eq("device_1"),
any(Handler.class), any());
any(Handler.class), anyInt(), any());
// and the command consumer has been closed again
verify(commandConsumer).close(any());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ private Future<MessageConsumer> startLoraCommandConsumer(final String tenantId)
tenantId,
LORA_COMMAND_CONSUMER_DEVICE_ID,
receivedCommandContext -> commandConsumer(tenantId, receivedCommandContext),
-1,
null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -774,10 +774,10 @@ private Future<MessageConsumer> createCommandConsumer(final MqttEndpoint mqttEnd
if (sub.isGatewaySubscriptionForSpecificDevice()) {
// gateway scenario
return getCommandConsumerFactory().createCommandConsumer(sub.getTenant(), sub.getDeviceId(),
sub.getAuthenticatedDeviceId(), commandHandler, span.context());
sub.getAuthenticatedDeviceId(), commandHandler, -1, span.context());
} else {
return getCommandConsumerFactory().createCommandConsumer(sub.getTenant(), sub.getDeviceId(), commandHandler,
span.context());
-1, span.context());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -915,7 +915,7 @@ private void testOnSubscribeRegistersAndClosesConnection(final MqttQoS qos) {

// WHEN a device subscribes to commands
final MessageConsumer commandConsumer = mock(MessageConsumer.class);
when(commandConsumerFactory.createCommandConsumer(eq("tenant"), eq("deviceId"), any(Handler.class), any()))
when(commandConsumerFactory.createCommandConsumer(eq("tenant"), eq("deviceId"), any(Handler.class), anyInt(), any()))
.thenReturn(Future.succeededFuture(commandConsumer));
final List<MqttTopicSubscription> subscriptions = Collections.singletonList(
newMockTopicSubscription(getCommandSubscriptionTopic("tenant", "deviceId"), qos));
Expand All @@ -929,7 +929,7 @@ private void testOnSubscribeRegistersAndClosesConnection(final MqttQoS qos) {
adapter.onSubscribe(endpoint, null, msg, cmdHandler, OptionalInt.empty());

// THEN the adapter creates a command consumer that is checked periodically
verify(commandConsumerFactory).createCommandConsumer(eq("tenant"), eq("deviceId"), any(Handler.class), any());
verify(commandConsumerFactory).createCommandConsumer(eq("tenant"), eq("deviceId"), any(Handler.class), anyInt(), any());
// and the adapter registers a hook on the connection to the device
final ArgumentCaptor<Handler<Void>> closeHookCaptor = ArgumentCaptor.forClass(Handler.class);
verify(endpoint).closeHandler(closeHookCaptor.capture());
Expand Down Expand Up @@ -965,7 +965,7 @@ public void testOnSubscribeIncludesStatusCodeForEachFilter() {
subscriptions.add(newMockTopicSubscription("bumlux/+/+/#", MqttQoS.AT_MOST_ONCE));
subscriptions.add(newMockTopicSubscription("bumlux/+/+/#", MqttQoS.AT_MOST_ONCE));
// and for subscribing to commands
when(commandConsumerFactory.createCommandConsumer(eq("tenant-1"), eq("device-A"), any(Handler.class), any()))
when(commandConsumerFactory.createCommandConsumer(eq("tenant-1"), eq("device-A"), any(Handler.class), anyInt(), any()))
.thenReturn(Future.succeededFuture(mock(MessageConsumer.class)));
subscriptions.add(
newMockTopicSubscription(getCommandSubscriptionTopic("tenant-1", "device-A"), MqttQoS.AT_MOST_ONCE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ void initialize(CommandTargetMapper commandTargetMapper,
* @param commandHandler The handler to invoke with every command received. The handler must invoke one of the
* terminal methods of the passed in {@link CommandContext} in order to settle the command
* message transfer and finish the trace span associated with the {@link CommandContext}.
* @param lifespanSeconds The time period in which the command consumer shall be active. A negative value is
* interpreted as an unlimited lifespan.
* @param context The currently active OpenTracing span context or {@code null} if no span is currently active.
* An implementation should use this as the parent for any span it creates for tracing
* the execution of this operation.
Expand All @@ -83,6 +85,7 @@ Future<MessageConsumer> createCommandConsumer(
String tenantId,
String deviceId,
Handler<CommandContext> commandHandler,
int lifespanSeconds,
SpanContext context);

/**
Expand All @@ -104,6 +107,8 @@ Future<MessageConsumer> createCommandConsumer(
* @param commandHandler The handler to invoke with every command received. The handler must invoke one of the
* terminal methods of the passed in {@link CommandContext} in order to settle the command
* message transfer and finish the trace span associated with the {@link CommandContext}.
* @param lifespanSeconds The time period in which the command consumer shall be active. A negative value is
* interpreted as an unlimited lifespan.
* @param context The currently active OpenTracing span context or {@code null} if no span is currently active.
* An implementation should use this as the parent for any span it creates for tracing
* the execution of this operation.
Expand All @@ -121,6 +126,7 @@ Future<MessageConsumer> createCommandConsumer(
String deviceId,
String gatewayId,
Handler<CommandContext> commandHandler,
int lifespanSeconds,
SpanContext context);

/**
Expand Down
Loading

0 comments on commit 27fca73

Please sign in to comment.