Skip to content

Commit

Permalink
[eclipse-hono#1858] Add lifespan to setCommandHandlingAdapterInstance.
Browse files Browse the repository at this point in the history
This makes it possible to restrict the lifespan of
entries set via the setCommandHandlingAdapterInstance
method of the Device Connection API.

Signed-off-by: Carsten Lohmann <[email protected]>
  • Loading branch information
calohmn committed Apr 16, 2020
1 parent 094df23 commit 53ed37c
Show file tree
Hide file tree
Showing 24 changed files with 365 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import org.eclipse.hono.client.ConnectionLifecycle;
Expand Down Expand Up @@ -207,6 +208,24 @@ public Future<V> put(final K key, final V value) {

}

/**
* Puts a value to the cache.
*
* @param key The key.
* @param value The value.
* @param lifespan The lifespan of the entry. A negative value is interpreted as an unlimited lifespan.
* @param lifespanUnit The time unit for the lifespan.
* @return A succeeded future containing the previous value or {@code null} if the
* cache didn't contain the key yet.
* A failed future if the value could not be stored in the cache.
*/
@Override
public Future<V> put(final K key, final V value, final long lifespan, final TimeUnit lifespanUnit) {

return withCache(cache -> cache.putAsync(key, value, lifespan, lifespanUnit));

}

/**
* Remove a key/value mapping from the cache.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import io.vertx.core.Future;
import io.vertx.core.json.JsonObject;
Expand Down Expand Up @@ -47,6 +48,19 @@ public interface Cache<K, V> {
*/
Future<V> put(K key, V value);

/**
* Puts a value to the cache.
*
* @param key The key.
* @param value The value.
* @param lifespan The lifespan of the entry. A negative value is interpreted as an unlimited lifespan.
* @param lifespanUnit The time unit for the lifespan.
* @return A succeeded future containing the previous value or {@code null} if the
* cache didn't contain the key yet.
* A failed future if the value could not be stored in the cache.
*/
Future<V> put(K key, V value, long lifespan, TimeUnit lifespanUnit);

/**
* Gets a value from the cache.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ public Future<JsonObject> getLastKnownGatewayForDevice(final String deviceId, fi

@Override
public Future<Void> setCommandHandlingAdapterInstance(final String deviceId, final String adapterInstanceId,
final SpanContext context) {
return cache.setCommandHandlingAdapterInstance(tenantId, deviceId, adapterInstanceId, context);
final int lifespanSeconds, final SpanContext context) {
return cache.setCommandHandlingAdapterInstance(tenantId, deviceId, adapterInstanceId, lifespanSeconds, context);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.eclipse.hono.client.ClientErrorException;
Expand Down Expand Up @@ -138,12 +139,12 @@ public Future<JsonObject> getLastKnownGatewayForDevice(final String tenantId, fi

@Override
public Future<Void> setCommandHandlingAdapterInstance(final String tenantId, final String deviceId,
final String adapterInstanceId, final SpanContext context) {
final String adapterInstanceId, final int lifespanSeconds, final SpanContext context) {
Objects.requireNonNull(tenantId);
Objects.requireNonNull(deviceId);
Objects.requireNonNull(adapterInstanceId);

return cache.put(getAdapterInstanceEntryKey(tenantId, deviceId), adapterInstanceId)
return cache.put(getAdapterInstanceEntryKey(tenantId, deviceId), adapterInstanceId, lifespanSeconds, TimeUnit.SECONDS)
.map(replacedValue -> {
LOG.debug("set command handling adapter instance [tenant: {}, device-id: {}, adapter-instance: {}]",
tenantId, deviceId, adapterInstanceId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public interface DeviceConnectionInfo {
* @param tenantId The tenant id.
* @param deviceId The device id.
* @param adapterInstanceId The protocol adapter instance id.
* @param lifespanSeconds The lifespan of the mapping entry in seconds. A negative value is interpreted as an
* unlimited lifespan.
* @param context The currently active OpenTracing span context or {@code null} if no span is currently active.
* Implementing classes should use this as the parent for any span they create for tracing
* the execution of this operation.
Expand All @@ -82,7 +84,8 @@ public interface DeviceConnectionInfo {
* Otherwise the future will be failed with a {@link org.eclipse.hono.client.ServiceInvocationException}.
* @throws NullPointerException if any of the parameters except context is {@code null}.
*/
Future<Void> setCommandHandlingAdapterInstance(String tenantId, String deviceId, String adapterInstanceId, SpanContext context);
Future<Void> setCommandHandlingAdapterInstance(String tenantId, String deviceId, String adapterInstanceId,
int lifespanSeconds, SpanContext context);

/**
* Removes the mapping information that associates the given device with the given protocol adapter instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
Expand Down Expand Up @@ -134,6 +135,28 @@ void testPutSucceeds(final VertxTestContext ctx) {
}));
}

/**
* Verifies that a request to put a value with a lifespan to the cache
* results in the value being written to the data grid with the given lifespan.
*
* @param ctx The vert.x text context.
*/
@Test
void testPutWithLifespanSucceeds(final VertxTestContext ctx) {
final org.infinispan.commons.api.BasicCache<Object, Object> grid = givenAConnectedCache();
when(grid.putAsync(anyString(), anyString(), anyLong(), any(TimeUnit.class)))
.thenReturn(CompletableFuture.completedFuture("oldValue"));
cache.connect()
.compose(c -> c.put("key", "value", 1, TimeUnit.SECONDS))
.setHandler(ctx.succeeding(v -> {
ctx.verify(() -> {
verify(grid).putAsync("key", "value", 1, TimeUnit.SECONDS);
assertThat(v).isEqualTo("oldValue");
});
ctx.completeNow();
}));
}

/**
* Verifies that a request to put a value to the cache fails with the
* root cause for the failure to access the data grid.
Expand Down
Loading

0 comments on commit 53ed37c

Please sign in to comment.