Skip to content

Commit

Permalink
[#1858] Add 'updateOnly' param to setCommandHandlingAdapterInstance.
Browse files Browse the repository at this point in the history
Signed-off-by: Carsten Lohmann <[email protected]>
  • Loading branch information
calohmn committed May 6, 2020
1 parent bfa7115 commit 478e626
Show file tree
Hide file tree
Showing 21 changed files with 514 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,12 @@ protected <T> Future<T> withCache(final Function<org.infinispan.commons.api.Basi
* @return A succeeded future containing the previous value or {@code null} if the
* cache didn't contain the key yet.
* A failed future if the value could not be stored in the cache.
* @throws NullPointerException if any of the parameters is {@code null}.
*/
@Override
public Future<V> put(final K key, final V value) {
Objects.requireNonNull(key);
Objects.requireNonNull(value);

return withCache(cache -> cache.putAsync(key, value));

Expand All @@ -216,24 +219,55 @@ public Future<V> put(final K key, final V value) {
* @return A succeeded future containing the previous value or {@code null} if the
* cache didn't contain the key yet.
* A failed future if the value could not be stored in the cache.
* @throws NullPointerException if any of the parameters is {@code null}.
*/
@Override
public Future<V> put(final K key, final V value, final long lifespan, final TimeUnit lifespanUnit) {
Objects.requireNonNull(key);
Objects.requireNonNull(value);
Objects.requireNonNull(lifespanUnit);

return withCache(cache -> cache.putAsync(key, value, lifespan, lifespanUnit));

}

/**
* Replaces the entry for a key only if currently mapped to a given value.
*
* @param key The key.
* @param oldValue The value to overwrite.
* @param newValue The value to store.
* @param lifespan The lifespan of the entry. A negative value is interpreted as an unlimited lifespan.
* @param lifespanUnit The time unit for the lifespan.
* @return A succeeded future containing a boolean, indicating whether the value was replaced or not.
* A failed future if the value could not be stored in the cache.
* @throws NullPointerException if any of the parameters is {@code null}.
*/
@Override
public Future<Boolean> replace(final K key, final V oldValue, final V newValue, final long lifespan,
final TimeUnit lifespanUnit) {
Objects.requireNonNull(key);
Objects.requireNonNull(oldValue);
Objects.requireNonNull(newValue);
Objects.requireNonNull(lifespanUnit);

return withCache(cache -> cache.replaceAsync(key, oldValue, newValue, lifespan, lifespanUnit));

}

/**
* Remove a key/value mapping from the cache.
*
* @param key The key.
* @param value The value.
* @return {@code true} if the key was mapped to the value, {@code false}
* otherwise.
* @throws NullPointerException if any of the parameters is {@code null}.
*/
@Override
public Future<Boolean> remove(final K key, final V value) {
Objects.requireNonNull(key);
Objects.requireNonNull(value);

return withCache(cache -> cache.removeAsync(key, value));

Expand All @@ -246,9 +280,11 @@ public Future<Boolean> remove(final K key, final V value) {
* @return A succeeded future containing the value or {@code null} if the
* cache didn't contain the key yet.
* A failed future if the value could not be read from the cache.
* @throws NullPointerException if key is {@code null}.
*/
@Override
public Future<V> get(final K key) {
Objects.requireNonNull(key);

return withCache(cache -> cache.getAsync(key));

Expand All @@ -259,9 +295,11 @@ public Future<V> get(final K key) {
*
* @param keys The keys.
* @return A succeeded future containing a map with key/value pairs.
* @throws NullPointerException if keys is {@code null}.
*/
@Override
public Future<Map<K, V>> getAll(final Set<? extends K> keys) {
Objects.requireNonNull(keys);

return withCache(cache -> cache.getAllAsync(keys));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public interface Cache<K, V> {
* @return A succeeded future containing the previous value or {@code null} if the
* cache didn't contain the key yet.
* A failed future if the value could not be stored in the cache.
* @throws NullPointerException if any of the parameters is {@code null}.
*/
Future<V> put(K key, V value);

Expand All @@ -58,16 +59,32 @@ public interface Cache<K, V> {
* @return A succeeded future containing the previous value or {@code null} if the
* cache didn't contain the key yet.
* A failed future if the value could not be stored in the cache.
* @throws NullPointerException if any of the parameters is {@code null}.
*/
Future<V> put(K key, V value, long lifespan, TimeUnit lifespanUnit);

/**
* Replaces the entry for a key only if currently mapped to a given value.
*
* @param key The key.
* @param oldValue The value to overwrite.
* @param newValue The value to store.
* @param lifespan The lifespan of the entry. A negative value is interpreted as an unlimited lifespan.
* @param lifespanUnit The time unit for the lifespan.
* @return A succeeded future containing a boolean, indicating whether the value was replaced or not.
* A failed future if the value could not be stored in the cache.
* @throws NullPointerException if any of the parameters is {@code null}.
*/
Future<Boolean> replace(K key, V oldValue, V newValue, long lifespan, TimeUnit lifespanUnit);

/**
* Gets a value from the cache.
*
* @param key The key.
* @return A succeeded future containing the value or {@code null} if the
* cache didn't contain the key yet.
* A failed future if the value could not be read from the cache.
* @throws NullPointerException if key is {@code null}.
*/
Future<V> get(K key);

Expand All @@ -79,6 +96,7 @@ public interface Cache<K, V> {
* @return A succeeded future containing {@code true} if the key was
* mapped to the value, {@code false} otherwise.
* A failed future if the value could not be remove from the cache.
* @throws NullPointerException if any of the parameters is {@code null}.
*/
Future<Boolean> remove(K key, V value);

Expand All @@ -87,6 +105,7 @@ public interface Cache<K, V> {
*
* @param keys The keys.
* @return A succeeded future containing a map with key/value pairs.
* @throws NullPointerException if keys is {@code null}.
*/
Future<Map<K, V>> getAll(Set<? extends K> keys);
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ public Future<JsonObject> getLastKnownGatewayForDevice(final String deviceId, fi

@Override
public Future<Void> setCommandHandlingAdapterInstance(final String deviceId, final String adapterInstanceId,
final Duration lifespan, final SpanContext context) {
return cache.setCommandHandlingAdapterInstance(tenantId, deviceId, adapterInstanceId, lifespan, context);
final Duration lifespan, final boolean updateOnly, final SpanContext context) {
return cache.setCommandHandlingAdapterInstance(tenantId, deviceId, adapterInstanceId, lifespan, updateOnly, context);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,25 +140,46 @@ public Future<JsonObject> getLastKnownGatewayForDevice(final String tenantId, fi

@Override
public Future<Void> setCommandHandlingAdapterInstance(final String tenantId, final String deviceId,
final String adapterInstanceId, final Duration lifespan, final SpanContext context) {
final String adapterInstanceId, final Duration lifespan, final boolean updateOnly, final SpanContext context) {
Objects.requireNonNull(tenantId);
Objects.requireNonNull(deviceId);
Objects.requireNonNull(adapterInstanceId);

// sanity check, preventing an ArithmeticException in lifespan.toMillis()
final long lifespanMillis = lifespan == null || lifespan.isNegative()
|| lifespan.getSeconds() > (Long.MAX_VALUE / 1000L) ? -1 : lifespan.toMillis();
return cache.put(getAdapterInstanceEntryKey(tenantId, deviceId), adapterInstanceId, lifespanMillis, TimeUnit.MILLISECONDS)
.map(replacedValue -> {
LOG.debug("set command handling adapter instance [tenant: {}, device-id: {}, adapter-instance: {}, lifespan: {}ms]",
tenantId, deviceId, adapterInstanceId, lifespanMillis);
return (Void) null;
})
.recover(t -> {
LOG.debug("failed to set command handling adapter instance [tenant: {}, device-id: {}, adapter-instance: {}, lifespan: {}ms]",
tenantId, deviceId, adapterInstanceId, lifespanMillis, t);
return Future.failedFuture(new ServerErrorException(HttpURLConnection.HTTP_INTERNAL_ERROR, t));
});
final String key = getAdapterInstanceEntryKey(tenantId, deviceId);
if (updateOnly) {
return cache.replace(key, adapterInstanceId, adapterInstanceId, lifespanMillis, TimeUnit.MILLISECONDS)
.recover(t -> {
LOG.debug("failed to update command handling adapter instance [tenant: {}, device-id: {}, adapter-instance: {}, lifespan: {}ms]",
tenantId, deviceId, adapterInstanceId, lifespanMillis, t);
return Future.failedFuture(new ServerErrorException(HttpURLConnection.HTTP_INTERNAL_ERROR, t));
})
.compose(replaced -> {
if (replaced) {
LOG.debug("updated command handling adapter instance [tenant: {}, device-id: {}, adapter-instance: {}, lifespan: {}ms]",
tenantId, deviceId, adapterInstanceId, lifespanMillis);
return Future.succeededFuture();
} else {
LOG.debug("cannot update non-existing command handling adapter instance entry [tenant: {}, device-id: {}, adapter-instance: {}, lifespan: {}ms]",
tenantId, deviceId, adapterInstanceId, lifespanMillis);
return Future.failedFuture(new ClientErrorException(HttpURLConnection.HTTP_PRECON_FAILED));
}
});
} else {
return cache.put(key, adapterInstanceId, lifespanMillis, TimeUnit.MILLISECONDS)
.recover(t -> {
LOG.debug("failed to set command handling adapter instance [tenant: {}, device-id: {}, adapter-instance: {}, lifespan: {}ms]",
tenantId, deviceId, adapterInstanceId, lifespanMillis, t);
return Future.failedFuture(new ServerErrorException(HttpURLConnection.HTTP_INTERNAL_ERROR, t));
})
.map(replacedValue -> {
LOG.debug("set command handling adapter instance [tenant: {}, device-id: {}, adapter-instance: {}, lifespan: {}ms]",
tenantId, deviceId, adapterInstanceId, lifespanMillis);
return (Void) null;
});
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ public interface DeviceConnectionInfo {
* @param adapterInstanceId The protocol adapter instance id.
* @param lifespan The lifespan of the mapping entry. Using a negative duration or {@code null} here is
* interpreted as an unlimited lifespan.
* @param updateOnly If {@code true}, this operation will only update an existing mapping entry with the given
* lifespan, provided such an entry with the given device id and adapter instance id exists.
* @param context The currently active OpenTracing span context or {@code null} if no span is currently active.
* Implementing classes should use this as the parent for any span they create for tracing
* the execution of this operation.
Expand All @@ -86,7 +88,7 @@ public interface DeviceConnectionInfo {
* @throws NullPointerException if any of the parameters except context is {@code null}.
*/
Future<Void> setCommandHandlingAdapterInstance(String tenantId, String deviceId, String adapterInstanceId,
Duration lifespan, SpanContext context);
Duration lifespan, boolean updateOnly, SpanContext context);

/**
* Removes the mapping information that associates the given device with the given protocol adapter instance
Expand Down
Loading

0 comments on commit 478e626

Please sign in to comment.