Skip to content

Commit

Permalink
[#780] Implement DownstreamSenderFactoryImpl.
Browse files Browse the repository at this point in the history
The DownstreamSenderFactory methods have been moved from
HonoConnectionImpl to DownstreamSenderFactoryImpl.
As part of this, the org.eclipse.hono.client.impl.*ClientImpl hierarchy
has been refactored to use a HonoConnection instance instead of passing
around multiple properties of the connection explicitly. This serves as
the groundwork for implementing the other client factories analogously
to the DownstreamSenderFactory.

Signed-off-by: Kai Hudalla <[email protected]>
  • Loading branch information
Kai Hudalla committed Apr 15, 2019
1 parent 13a7b57 commit cf56777
Show file tree
Hide file tree
Showing 58 changed files with 2,336 additions and 1,968 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
import org.eclipse.hono.client.Command;
import org.eclipse.hono.client.CommandContext;
import org.eclipse.hono.client.CommandResponse;
import org.eclipse.hono.client.DownstreamSender;
import org.eclipse.hono.client.MessageConsumer;
import org.eclipse.hono.client.MessageSender;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.service.AbstractProtocolAdapterBase;
Expand Down Expand Up @@ -856,7 +856,7 @@ private Future<ProtonDelivery> uploadMessage(
private Future<ProtonDelivery> doUploadMessage(
final AmqpContext context,
final ResourceIdentifier resource,
final Future<MessageSender> senderFuture,
final Future<DownstreamSender> senderFuture,
final Span currentSpan) {

LOG.trace("forwarding {} message", context.getEndpoint().getCanonicalName());
Expand All @@ -869,7 +869,7 @@ private Future<ProtonDelivery> doUploadMessage(
return CompositeFuture.all(tenantEnabledFuture, tokenFuture, senderFuture)
.compose(ok -> {

final MessageSender sender = senderFuture.result();
final DownstreamSender sender = senderFuture.result();
final Message downstreamMessage = addProperties(
context.getMessage(),
ResourceIdentifier.from(context.getEndpoint().getCanonicalName(), resource.getTenantId(), resource.getResourceId()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@
import org.eclipse.hono.client.CommandResponse;
import org.eclipse.hono.client.CommandResponseSender;
import org.eclipse.hono.client.CredentialsClientFactory;
import org.eclipse.hono.client.DownstreamSender;
import org.eclipse.hono.client.DownstreamSenderFactory;
import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.client.MessageConsumer;
import org.eclipse.hono.client.MessageSender;
import org.eclipse.hono.client.RegistrationClient;
import org.eclipse.hono.client.RegistrationClientFactory;
import org.eclipse.hono.client.TenantClient;
Expand Down Expand Up @@ -227,7 +227,7 @@ public void testAdapterAcceptsAnonymousRelayReceiverOnly() {
public void testUploadTelemetryWithAtMostOnceDeliverySemantics(final TestContext ctx) {
// GIVEN an AMQP adapter with a configured server
final VertxBasedAmqpProtocolAdapter adapter = givenAnAmqpAdapter();
final MessageSender telemetrySender = givenATelemetrySenderForAnyTenant();
final DownstreamSender telemetrySender = givenATelemetrySenderForAnyTenant();
when(telemetrySender.send(any(Message.class), (SpanContext) any())).thenReturn(Future.succeededFuture(mock(ProtonDelivery.class)));

// which is enabled for a tenant
Expand Down Expand Up @@ -265,7 +265,7 @@ public void testUploadTelemetryWithAtMostOnceDeliverySemantics(final TestContext
public void testUploadTelemetryWithAtLeastOnceDeliverySemantics(final TestContext ctx) {
// GIVEN an adapter configured to use a user-define server.
final VertxBasedAmqpProtocolAdapter adapter = givenAnAmqpAdapter();
final MessageSender telemetrySender = givenATelemetrySenderForAnyTenant();
final DownstreamSender telemetrySender = givenATelemetrySenderForAnyTenant();
final Future<ProtonDelivery> downstreamDelivery = Future.future();
when(telemetrySender.sendAndWaitForOutcome(any(Message.class), (SpanContext) any())).thenReturn(downstreamDelivery);

Expand Down Expand Up @@ -311,7 +311,7 @@ public void testUploadTelemetryMessageFailsForDisabledAdapter(final TestContext

// GIVEN an adapter configured to use a user-define server.
final VertxBasedAmqpProtocolAdapter adapter = givenAnAmqpAdapter();
final MessageSender telemetrySender = givenATelemetrySenderForAnyTenant();
final DownstreamSender telemetrySender = givenATelemetrySenderForAnyTenant();

// AND given a tenant for which the AMQP Adapter is disabled
givenAConfiguredTenant(TEST_TENANT_ID, false);
Expand Down Expand Up @@ -352,7 +352,7 @@ public void testUploadEventFailsForGatewayOfDifferentTenant(final TestContext ct

// GIVEN an adapter
final VertxBasedAmqpProtocolAdapter adapter = givenAnAmqpAdapter();
final MessageSender eventSender = givenAnEventSender(Future.future());
final DownstreamSender eventSender = givenAnEventSender(Future.future());

// with an enabled tenant
givenAConfiguredTenant(TEST_TENANT_ID, true);
Expand Down Expand Up @@ -386,7 +386,7 @@ public void testAdapterOpensSenderLinkAndNotifyDownstreamApplication() {
// GIVEN an AMQP adapter configured to use a user-defined server
final VertxBasedAmqpProtocolAdapter adapter = givenAnAmqpAdapter();
final Future<ProtonDelivery> outcome = Future.future();
final MessageSender eventSender = givenAnEventSender(outcome);
final DownstreamSender eventSender = givenAnEventSender(outcome);

// WHEN an unauthenticated device opens a receiver link with a valid source address
final ProtonConnection deviceConnection = mock(ProtonConnection.class);
Expand Down Expand Up @@ -420,7 +420,7 @@ public void testAdapterClosesCommandConsumerWhenDeviceClosesReceiverLink() {
// GIVEN an AMQP adapter
final VertxBasedAmqpProtocolAdapter adapter = givenAnAmqpAdapter();
final Future<ProtonDelivery> outcome = Future.future();
final MessageSender eventSender = givenAnEventSender(outcome);
final DownstreamSender eventSender = givenAnEventSender(outcome);

// and a device that wants to receive commands
final MessageConsumer commandConsumer = mock(MessageConsumer.class);
Expand Down Expand Up @@ -496,7 +496,7 @@ public void testAdapterClosesCommandConsumerWhenConnectionToDeviceIsLost(final T
private void testAdapterClosesCommandConsumer(final TestContext ctx, final Handler<ProtonConnection> connectionLossTrigger) {

// GIVEN an AMQP adapter
final MessageSender downstreamEventSender = givenAnEventSender(Future.succeededFuture());
final DownstreamSender downstreamEventSender = givenAnEventSender(Future.succeededFuture());
final ProtonServer server = getAmqpServer();
final VertxBasedAmqpProtocolAdapter adapter = getAdapter(server);

Expand Down Expand Up @@ -809,8 +809,8 @@ private Message getFakeMessage(final String to, final Buffer payload, final Stri
return message;
}

private MessageSender givenATelemetrySenderForAnyTenant() {
final MessageSender sender = mock(MessageSender.class);
private DownstreamSender givenATelemetrySenderForAnyTenant() {
final DownstreamSender sender = mock(DownstreamSender.class);
when(downstreamSenderFactory.getOrCreateTelemetrySender(anyString())).thenReturn(Future.succeededFuture(sender));
return sender;
}
Expand All @@ -822,8 +822,8 @@ private CommandResponseSender givenACommandResponseSenderForAnyTenant() {
return responseSender;
}

private MessageSender givenAnEventSender(final Future<ProtonDelivery> outcome) {
final MessageSender sender = mock(MessageSender.class);
private DownstreamSender givenAnEventSender(final Future<ProtonDelivery> outcome) {
final DownstreamSender sender = mock(DownstreamSender.class);
when(sender.sendAndWaitForOutcome(any(Message.class), (SpanContext) any())).thenReturn(outcome);

when(downstreamSenderFactory.getOrCreateEventSender(anyString())).thenReturn(Future.succeededFuture(sender));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.eclipse.californium.scandium.config.DtlsConnectorConfig;
import org.eclipse.hono.auth.Device;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.MessageSender;
import org.eclipse.hono.client.DownstreamSender;
import org.eclipse.hono.config.KeyLoader;
import org.eclipse.hono.service.AbstractProtocolAdapterBase;
import org.eclipse.hono.service.metric.MetricsTags;
Expand Down Expand Up @@ -496,7 +496,7 @@ private void doUploadMessage(
final boolean waitForOutcome,
final Buffer payload,
final String contentType,
final Future<MessageSender> senderTracker,
final Future<DownstreamSender> senderTracker,
final MetricsTags.EndpointType endpoint) {

if (contentType == null) {
Expand All @@ -512,7 +512,7 @@ private void doUploadMessage(
final Future<TenantObject> tenantEnabledTracker = getTenantConfiguration(device.getTenantId(), null)
.compose(tenantObject -> isAdapterEnabled(tenantObject));
CompositeFuture.all(tokenTracker, senderTracker, tenantEnabledTracker).compose(ok -> {
final MessageSender sender = senderTracker.result();
final DownstreamSender sender = senderTracker.result();
final Message downstreamMessage = newMessage(
ResourceIdentifier.from(endpoint.getCanonicalName(), device.getTenantId(), device.getDeviceId()),
"/" + context.getExchange().getRequestOptions().getUriPathString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.CommandConsumerFactory;
import org.eclipse.hono.client.CredentialsClientFactory;
import org.eclipse.hono.client.DownstreamSender;
import org.eclipse.hono.client.DownstreamSenderFactory;
import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.client.MessageSender;
import org.eclipse.hono.client.RegistrationClient;
import org.eclipse.hono.client.RegistrationClientFactory;
import org.eclipse.hono.client.TenantClient;
Expand Down Expand Up @@ -312,7 +312,7 @@ public void testStartDoesNotInvokeOnStartupSuccessIfStartupFails(final TestConte
public void testUploadTelemetryFailsForDisabledTenant() {

// GIVEN an adapter
final MessageSender sender = mock(MessageSender.class);
final DownstreamSender sender = mock(DownstreamSender.class);
when(downstreamSenderFactory.getOrCreateTelemetrySender(anyString())).thenReturn(Future.succeededFuture(sender));
// which is disabled for tenant "my-tenant"
final TenantObject myTenantConfig = TenantObject.from("my-tenant", true);
Expand Down Expand Up @@ -528,23 +528,23 @@ protected void onStartupSuccess() {

private void givenAnEventSenderForOutcome(final Future<ProtonDelivery> outcome) {

final MessageSender sender = mock(MessageSender.class);
final DownstreamSender sender = mock(DownstreamSender.class);
when(sender.sendAndWaitForOutcome(any(Message.class))).thenReturn(outcome);

when(downstreamSenderFactory.getOrCreateEventSender(anyString())).thenReturn(Future.succeededFuture(sender));
}

private void givenATelemetrySenderForOutcome(final Future<ProtonDelivery> outcome) {

final MessageSender sender = mock(MessageSender.class);
final DownstreamSender sender = mock(DownstreamSender.class);
when(sender.sendAndWaitForOutcome(any(Message.class))).thenReturn(outcome);

when(downstreamSenderFactory.getOrCreateTelemetrySender(anyString())).thenReturn(Future.succeededFuture(sender));
}

private void givenATelemetrySender(final Future<ProtonDelivery> outcome) {

final MessageSender sender = mock(MessageSender.class);
final DownstreamSender sender = mock(DownstreamSender.class);
when(sender.send(any(Message.class))).thenReturn(outcome);

when(downstreamSenderFactory.getOrCreateTelemetrySender(anyString())).thenReturn(Future.succeededFuture(sender));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import org.eclipse.hono.client.Command;
import org.eclipse.hono.client.CommandContext;
import org.eclipse.hono.client.CommandResponse;
import org.eclipse.hono.client.DownstreamSender;
import org.eclipse.hono.client.MessageConsumer;
import org.eclipse.hono.client.MessageSender;
import org.eclipse.hono.client.ResourceConflictException;
import org.eclipse.hono.service.AbstractProtocolAdapterBase;
import org.eclipse.hono.service.auth.DeviceUser;
Expand Down Expand Up @@ -574,7 +574,7 @@ private void doUploadMessage(
final String deviceId,
final Buffer payload,
final String contentType,
final Future<MessageSender> senderTracker,
final Future<DownstreamSender> senderTracker,
final MetricsTags.EndpointType endpoint) {

if (!isPayloadOfIndicatedType(payload, contentType)) {
Expand Down Expand Up @@ -625,7 +625,7 @@ private void doUploadMessage(
CompositeFuture.all(senderTracker, commandConsumerTracker)
.compose(ok -> {

final MessageSender sender = senderTracker.result();
final DownstreamSender sender = senderTracker.result();

final Integer ttd = Optional.ofNullable(commandConsumerTracker.result()).map(c -> ttdTracker.result())
.orElse(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@

import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.CommandConsumer;
import org.eclipse.hono.client.CommandConsumerFactory;
import org.eclipse.hono.client.CommandContext;
import org.eclipse.hono.client.CommandResponse;
import org.eclipse.hono.client.CommandResponseSender;
import org.eclipse.hono.client.CredentialsClientFactory;
import org.eclipse.hono.client.DownstreamSender;
import org.eclipse.hono.client.DownstreamSenderFactory;
import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.client.MessageSender;
import org.eclipse.hono.client.MessageConsumer;
import org.eclipse.hono.client.RegistrationClient;
import org.eclipse.hono.client.RegistrationClientFactory;
import org.eclipse.hono.client.ResourceConflictException;
Expand Down Expand Up @@ -112,7 +112,7 @@ public class AbstractVertxBasedHttpProtocolAdapterTest {
private TenantClient tenantClient;
private HttpProtocolAdapterProperties config;
private CommandConsumerFactory commandConsumerFactory;
private CommandConsumer commandConsumer;
private MessageConsumer commandConsumer;
private Vertx vertx;
private Context context;
private HttpAdapterMetrics metrics;
Expand Down Expand Up @@ -161,7 +161,7 @@ public void setup() {
when(registrationClientFactory.connect()).thenReturn(Future.succeededFuture(mock(HonoConnection.class)));
when(registrationClientFactory.getOrCreateRegistrationClient(anyString())).thenReturn(Future.succeededFuture(regClient));

commandConsumer = mock(CommandConsumer.class);
commandConsumer = mock(MessageConsumer.class);
doAnswer(invocation -> {
final Handler<AsyncResult<Void>> resultHandler = invocation.getArgument(0);
if (resultHandler != null) {
Expand Down Expand Up @@ -264,7 +264,7 @@ public void testUploadTelemetryFailsForDisabledTenant() {

// GIVEN an adapter
final HttpServer server = getHttpServer(false);
final MessageSender sender = givenATelemetrySenderForOutcome(Future.succeededFuture());
final DownstreamSender sender = givenATelemetrySenderForOutcome(Future.succeededFuture());

// which is disabled for tenant "my-tenant"
final TenantObject myTenantConfig = TenantObject.from("my-tenant", true);
Expand Down Expand Up @@ -310,7 +310,7 @@ public void testUploadTelemetryFailsForUnknownDevice() {

// GIVEN an adapter
final HttpServer server = getHttpServer(false);
final MessageSender sender = givenATelemetrySenderForOutcome(Future.succeededFuture());
final DownstreamSender sender = givenATelemetrySenderForOutcome(Future.succeededFuture());

// with an enabled tenant
final TenantObject myTenantConfig = TenantObject.from("my-tenant", true);
Expand Down Expand Up @@ -642,7 +642,7 @@ public void testUploadTelemetryRemovesTtdIfCommandConsumerIsInUse() {
// GIVEN an adapter with a downstream telemetry consumer attached
final HttpServer server = getHttpServer(false);
final AbstractVertxBasedHttpProtocolAdapter<HttpProtocolAdapterProperties> adapter = getAdapter(server, null);
final MessageSender sender = givenATelemetrySenderForOutcome(Future.succeededFuture());
final DownstreamSender sender = givenATelemetrySenderForOutcome(Future.succeededFuture());

// WHEN a device publishes a telemetry message with a TTD
final Buffer payload = Buffer.buffer("some payload");
Expand Down Expand Up @@ -689,7 +689,7 @@ public void testUploadEmptyNotificationSucceedsIfCommandConsumerIsInUse() {
// GIVEN an adapter with a downstream event consumer attached
final HttpServer server = getHttpServer(false);
final AbstractVertxBasedHttpProtocolAdapter<HttpProtocolAdapterProperties> adapter = getAdapter(server, null);
final MessageSender sender = givenAnEventSenderForOutcome(Future.succeededFuture());
final DownstreamSender sender = givenAnEventSenderForOutcome(Future.succeededFuture());

// WHEN a device publishes an empty notification event with a TTD
final HttpServerResponse response = mock(HttpServerResponse.class);
Expand Down Expand Up @@ -726,7 +726,7 @@ public void testUploadTelemetryUsesConfiguredMaxTtd() {
// GIVEN an adapter with a downstream telemetry consumer attached
final HttpServer server = getHttpServer(false);
final AbstractVertxBasedHttpProtocolAdapter<HttpProtocolAdapterProperties> adapter = getAdapter(server, null);
final MessageSender sender = givenATelemetrySenderForOutcome(Future.succeededFuture());
final DownstreamSender sender = givenATelemetrySenderForOutcome(Future.succeededFuture());

// WHEN a device publishes a telemetry message that belongs to a tenant with
// a max TTD of 20 secs
Expand Down Expand Up @@ -866,18 +866,18 @@ private CommandResponseSender givenACommandResponseSenderForOutcome(final Future
return sender;
}

private MessageSender givenAnEventSenderForOutcome(final Future<ProtonDelivery> outcome) {
private DownstreamSender givenAnEventSenderForOutcome(final Future<ProtonDelivery> outcome) {

final MessageSender sender = mock(MessageSender.class);
final DownstreamSender sender = mock(DownstreamSender.class);
when(sender.sendAndWaitForOutcome(any(Message.class), (SpanContext) any())).thenReturn(outcome);

when(downstreamSenderFactory.getOrCreateEventSender(anyString())).thenReturn(Future.succeededFuture(sender));
return sender;
}

private MessageSender givenATelemetrySenderForOutcome(final Future<ProtonDelivery> outcome) {
private DownstreamSender givenATelemetrySenderForOutcome(final Future<ProtonDelivery> outcome) {

final MessageSender sender = mock(MessageSender.class);
final DownstreamSender sender = mock(DownstreamSender.class);
when(sender.send(any(Message.class), (SpanContext) any())).thenReturn(outcome);

when(downstreamSenderFactory.getOrCreateTelemetrySender(anyString())).thenReturn(Future.succeededFuture(sender));
Expand Down
Loading

0 comments on commit cf56777

Please sign in to comment.