From 7b6568a521a580af7d4ad73a060fd758e34500df Mon Sep 17 00:00:00 2001
From: dtspence <33552925+dtspence@users.noreply.github.com>
Date: Fri, 4 Oct 2024 02:15:10 +0000
Subject: [PATCH 1/8] Field configuration helper cache implementation
---
warehouse/ingest-core/pom.xml | 15 +++
.../data/config/CachedFieldConfigHelper.java | 75 ++++++++++++++
.../data/config/ingest/BaseIngestHelper.java | 15 ++-
.../config/CachingFieldConfigHelperTest.java | 99 +++++++++++++++++++
4 files changed, 203 insertions(+), 1 deletion(-)
create mode 100644 warehouse/ingest-core/src/main/java/datawave/ingest/data/config/CachedFieldConfigHelper.java
create mode 100644 warehouse/ingest-core/src/test/java/datawave/ingest/data/config/CachingFieldConfigHelperTest.java
diff --git a/warehouse/ingest-core/pom.xml b/warehouse/ingest-core/pom.xml
index 90fb606c9be..014454ea370 100644
--- a/warehouse/ingest-core/pom.xml
+++ b/warehouse/ingest-core/pom.xml
@@ -212,6 +212,21 @@
javassist
test
+
+ org.junit.jupiter
+ junit-jupiter-api
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-engine
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-params
+ test
+
diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/CachedFieldConfigHelper.java b/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/CachedFieldConfigHelper.java
new file mode 100644
index 00000000000..48f3c4b7e38
--- /dev/null
+++ b/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/CachedFieldConfigHelper.java
@@ -0,0 +1,75 @@
+package datawave.ingest.data.config;
+
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.function.Function;
+
+import org.apache.commons.collections4.map.LRUMap;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class CachedFieldConfigHelper implements FieldConfigHelper {
+ private final FieldConfigHelper underlyingHelper;
+ private final Map resultCache;
+
+ enum AttributeType {
+ INDEXED_FIELD, REVERSE_INDEXED_FIELD, TOKENIZED_FIELD, REVERSE_TOKENIZED_FIELD, STORED_FIELD, INDEXED_ONLY
+ }
+
+ public CachedFieldConfigHelper(FieldConfigHelper helper, int limit) {
+ if (limit < 1) {
+ throw new IllegalArgumentException("Limit must be a positive integer");
+ }
+ this.underlyingHelper = helper;
+ this.resultCache = new LRUMap<>(limit);
+ }
+
+ @Override
+ public boolean isStoredField(String fieldName) {
+ return getOrEvaluate(AttributeType.STORED_FIELD, fieldName, underlyingHelper::isStoredField);
+ }
+
+ @Override
+ public boolean isIndexedField(String fieldName) {
+ return getOrEvaluate(AttributeType.INDEXED_FIELD, fieldName, underlyingHelper::isIndexedField);
+ }
+
+ @Override
+ public boolean isIndexOnlyField(String fieldName) {
+ return getOrEvaluate(AttributeType.INDEXED_ONLY, fieldName, underlyingHelper::isIndexOnlyField);
+ }
+
+ @Override
+ public boolean isReverseIndexedField(String fieldName) {
+ return getOrEvaluate(AttributeType.REVERSE_INDEXED_FIELD, fieldName, underlyingHelper::isReverseIndexedField);
+ }
+
+ @Override
+ public boolean isTokenizedField(String fieldName) {
+ return getOrEvaluate(AttributeType.TOKENIZED_FIELD, fieldName, underlyingHelper::isTokenizedField);
+ }
+
+ @Override
+ public boolean isReverseTokenizedField(String fieldName) {
+ return getOrEvaluate(AttributeType.REVERSE_TOKENIZED_FIELD, fieldName, underlyingHelper::isReverseTokenizedField);
+ }
+
+ @VisibleForTesting
+ boolean getOrEvaluate(AttributeType attributeType, String fieldName, Function evaluateFn) {
+ return resultCache.computeIfAbsent(fieldName, ResultEntry::new).resolveResult(attributeType, evaluateFn);
+ }
+
+ private static class ResultEntry {
+ private final String fieldName;
+ private final EnumMap resultMap;
+
+ ResultEntry(String fieldName) {
+ this.fieldName = fieldName;
+ this.resultMap = new EnumMap<>(AttributeType.class);
+ }
+
+ boolean resolveResult(AttributeType attributeType, Function evaluateFn) {
+ return resultMap.computeIfAbsent(attributeType, (t) -> evaluateFn.apply(fieldName));
+ }
+ }
+}
diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/ingest/BaseIngestHelper.java b/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/ingest/BaseIngestHelper.java
index c3d28d3a2d8..767566abb7b 100644
--- a/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/ingest/BaseIngestHelper.java
+++ b/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/ingest/BaseIngestHelper.java
@@ -33,6 +33,7 @@
import datawave.ingest.config.IngestConfigurationFactory;
import datawave.ingest.data.Type;
import datawave.ingest.data.TypeRegistry;
+import datawave.ingest.data.config.CachedFieldConfigHelper;
import datawave.ingest.data.config.DataTypeHelperImpl;
import datawave.ingest.data.config.FieldConfigHelper;
import datawave.ingest.data.config.MarkingsHelper;
@@ -138,9 +139,14 @@ public abstract class BaseIngestHelper extends AbstractIngestHelper implements C
public static final String FIELD_FAILED_NORMALIZATION_POLICY = ".data.field.normalization.failure.policy";
public static final String FIELD_CONFIG_FILE = ".data.category.field.config.file";
+ public static final String FIELD_CONFIG_CACHE_ENABLED = ".data.category.field.config.cache.enabled";
+ public static final String FIELD_CONFIG_CACHE_KEY_LIMIT = ".data.category.field.config.cache.limit";
private static final Logger log = ThreadConfigurableLogger.getLogger(BaseIngestHelper.class);
+ private static final boolean DEFAULT_FIELD_CACHE_ENABLED = false;
+ private static final int DEFAULT_FIELD_CACHE_LIMIT = 100;
+
private Multimap> typeFieldMap = null;
private Multimap> typePatternMap = null;
private TreeMultimap> typeCompiledPatternMap = null;
@@ -255,10 +261,17 @@ public void setup(Configuration config) {
// Load the field helper, which takes precedence over the individual field configurations
final String fieldConfigFile = config.get(this.getType().typeName() + FIELD_CONFIG_FILE);
if (fieldConfigFile != null) {
+ final boolean fieldConfigCacheEnabled = config.getBoolean(this.getType().typeName() + FIELD_CONFIG_CACHE_ENABLED, DEFAULT_FIELD_CACHE_ENABLED);
+ final int fieldConfigCacheLimit = config.getInt(this.getType().typeName() + FIELD_CONFIG_CACHE_KEY_LIMIT, DEFAULT_FIELD_CACHE_LIMIT);
if (log.isDebugEnabled()) {
log.debug("Field config file " + fieldConfigFile + " specified for: " + this.getType().typeName() + FIELD_CONFIG_FILE);
+ log.debug("Field config cache enabled: " + fieldConfigCacheEnabled);
+ if (fieldConfigCacheEnabled) {
+ log.debug("Field config cache limit: " + fieldConfigCacheLimit);
+ }
}
- this.fieldConfigHelper = XMLFieldConfigHelper.load(fieldConfigFile, this);
+ final FieldConfigHelper baseHelper = XMLFieldConfigHelper.load(fieldConfigFile, this);
+ fieldConfigHelper = fieldConfigCacheEnabled ? new CachedFieldConfigHelper(baseHelper, fieldConfigCacheLimit) : baseHelper;
}
// Process the indexed fields
diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/CachingFieldConfigHelperTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/CachingFieldConfigHelperTest.java
new file mode 100644
index 00000000000..48392823fbc
--- /dev/null
+++ b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/CachingFieldConfigHelperTest.java
@@ -0,0 +1,99 @@
+package datawave.ingest.data.config;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+public class CachingFieldConfigHelperTest {
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testCachingBehaviorWillCallBaseMethods() {
+ // @formatter:off
+ Stream.of(new Object[][] {
+ new Object[] {
+ (BiConsumer) FieldConfigHelper::isIndexOnlyField,
+ (BiConsumer) (h, f) -> verify(h).isIndexOnlyField(eq(f)),
+ (BiConsumer) FieldConfigHelper::isIndexedField,
+ (BiConsumer) (h, f) -> verify(h).isIndexedField(eq(f)),
+ (BiConsumer) FieldConfigHelper::isTokenizedField,
+ (BiConsumer) (h, f) -> verify(h).isTokenizedField(eq(f)),
+ (BiConsumer) FieldConfigHelper::isStoredField,
+ (BiConsumer) (h, f) -> verify(h).isStoredField(eq(f)),
+ (BiConsumer) FieldConfigHelper::isReverseIndexedField,
+ (BiConsumer) (h, f) -> verify(h).isReverseIndexedField(eq(f)),
+ (BiConsumer) FieldConfigHelper::isReverseTokenizedField,
+ (BiConsumer) (h, f) -> verify(h).isReverseTokenizedField(eq(f)),
+ }
+ }).forEach(arg -> {
+ // param[0] = helper method
+ // param[1] = validation method
+ String fieldName = "testField";
+ BiConsumer testAction = (BiConsumer) arg[0];
+ BiConsumer verifyAction = (BiConsumer) arg[1];
+ FieldConfigHelper mockHelper = mock(FieldConfigHelper.class);
+ FieldConfigHelper cachedHelper = new CachedFieldConfigHelper(mockHelper, 1);
+ testAction.accept(cachedHelper, fieldName);
+ verifyAction.accept(mockHelper, fieldName);
+ });
+ // @formatter:on
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = {-1, 0})
+ public void testConstructorWithNonPositiveLimitWillThrow(int limit) {
+ assertThrows(IllegalArgumentException.class, () -> new CachedFieldConfigHelper(mock(FieldConfigHelper.class), limit));
+ }
+
+ @Test
+ public void testCachingLimitsBetweenFieldsAndAttributeTypes() {
+ AtomicLong counter = new AtomicLong();
+ CachedFieldConfigHelper helper = new CachedFieldConfigHelper(mock(FieldConfigHelper.class), 2);
+ Function fn = (f) -> {
+ counter.incrementAndGet();
+ return true;
+ };
+
+ // following ensures that:
+ // 1. fields are computed, where appropriate per attribute-type
+ // 2. limit allows cache results to return
+ // 3. limit blocks results to return if exceeded
+ // 4. limit functions across attribute-types
+
+ helper.getOrEvaluate(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field1", fn);
+ Assertions.assertEquals(1, counter.get(), "field1 should compute result (new field)");
+
+ helper.getOrEvaluate(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field1", fn);
+ Assertions.assertEquals(1, counter.get(), "field1 repeated (existing field)");
+
+ helper.getOrEvaluate(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field2", fn);
+ Assertions.assertEquals(2, counter.get(), "field2 should compute result (new field)");
+
+ helper.getOrEvaluate(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field2", fn);
+ Assertions.assertEquals(2, counter.get(), "field2 repeated (existing)");
+
+ helper.getOrEvaluate(CachedFieldConfigHelper.AttributeType.INDEXED_FIELD, "field1", fn);
+ Assertions.assertEquals(3, counter.get(), "field1 should compute result (new attribute)");
+
+ helper.getOrEvaluate(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field3", fn);
+ Assertions.assertEquals(4, counter.get(), "field3 exceeded limit (new field)");
+
+ helper.getOrEvaluate(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field3", fn);
+ Assertions.assertEquals(4, counter.get(), "field3 exceeded limit (existing field)");
+
+ // LRU map should evict field #2
+ // we access field #1 above which has more accesses over field #2
+ helper.getOrEvaluate(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field2", fn);
+ Assertions.assertEquals(5, counter.get(), "field1 exceeded limit (new field/eviction)");
+ }
+}
From ce58f0424a8d33be100463e461b96d4b25dc7885 Mon Sep 17 00:00:00 2001
From: dtspence <33552925+dtspence@users.noreply.github.com>
Date: Thu, 24 Oct 2024 14:47:52 +0000
Subject: [PATCH 2/8] Refactored unit test and fixed test bug
---
.../config/CachingFieldConfigHelperTest.java | 52 ++++++++-----------
1 file changed, 21 insertions(+), 31 deletions(-)
diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/CachingFieldConfigHelperTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/CachingFieldConfigHelperTest.java
index 48392823fbc..f1d0deaa8fd 100644
--- a/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/CachingFieldConfigHelperTest.java
+++ b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/CachingFieldConfigHelperTest.java
@@ -6,9 +6,7 @@
import static org.mockito.Mockito.verify;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.BiConsumer;
import java.util.function.Function;
-import java.util.stream.Stream;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -16,37 +14,29 @@
import org.junit.jupiter.params.provider.ValueSource;
public class CachingFieldConfigHelperTest {
- @SuppressWarnings("unchecked")
@Test
public void testCachingBehaviorWillCallBaseMethods() {
- // @formatter:off
- Stream.of(new Object[][] {
- new Object[] {
- (BiConsumer) FieldConfigHelper::isIndexOnlyField,
- (BiConsumer) (h, f) -> verify(h).isIndexOnlyField(eq(f)),
- (BiConsumer) FieldConfigHelper::isIndexedField,
- (BiConsumer) (h, f) -> verify(h).isIndexedField(eq(f)),
- (BiConsumer) FieldConfigHelper::isTokenizedField,
- (BiConsumer) (h, f) -> verify(h).isTokenizedField(eq(f)),
- (BiConsumer) FieldConfigHelper::isStoredField,
- (BiConsumer) (h, f) -> verify(h).isStoredField(eq(f)),
- (BiConsumer) FieldConfigHelper::isReverseIndexedField,
- (BiConsumer) (h, f) -> verify(h).isReverseIndexedField(eq(f)),
- (BiConsumer) FieldConfigHelper::isReverseTokenizedField,
- (BiConsumer) (h, f) -> verify(h).isReverseTokenizedField(eq(f)),
- }
- }).forEach(arg -> {
- // param[0] = helper method
- // param[1] = validation method
- String fieldName = "testField";
- BiConsumer testAction = (BiConsumer) arg[0];
- BiConsumer verifyAction = (BiConsumer) arg[1];
- FieldConfigHelper mockHelper = mock(FieldConfigHelper.class);
- FieldConfigHelper cachedHelper = new CachedFieldConfigHelper(mockHelper, 1);
- testAction.accept(cachedHelper, fieldName);
- verifyAction.accept(mockHelper, fieldName);
- });
- // @formatter:on
+ String fieldName = "test";
+ FieldConfigHelper mockHelper = mock(FieldConfigHelper.class);
+ FieldConfigHelper cachedHelper = new CachedFieldConfigHelper(mockHelper, 1);
+
+ cachedHelper.isIndexOnlyField(fieldName);
+ verify(mockHelper).isIndexOnlyField(eq(fieldName));
+
+ cachedHelper.isIndexedField(fieldName);
+ verify(mockHelper).isIndexedField(eq(fieldName));
+
+ cachedHelper.isTokenizedField(fieldName);
+ verify(mockHelper).isTokenizedField(eq(fieldName));
+
+ cachedHelper.isStoredField(fieldName);
+ verify(mockHelper).isStoredField(eq(fieldName));
+
+ cachedHelper.isReverseIndexedField(fieldName);
+ verify(mockHelper).isReverseIndexedField(eq(fieldName));
+
+ cachedHelper.isReverseTokenizedField(fieldName);
+ verify(mockHelper).isReverseTokenizedField(eq(fieldName));
}
@ParameterizedTest
From 935db13a9323bb6826e0b2ace7e7f3cec2974443 Mon Sep 17 00:00:00 2001
From: dtspence <33552925+dtspence@users.noreply.github.com>
Date: Thu, 24 Oct 2024 18:17:30 +0000
Subject: [PATCH 3/8] Refactored implementation to use explicit switch and unit
test update
---
.../data/config/CachedFieldConfigHelper.java | 83 ++++++++++++++-----
.../config/CachingFieldConfigHelperTest.java | 66 ++++++++++-----
2 files changed, 107 insertions(+), 42 deletions(-)
diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/CachedFieldConfigHelper.java b/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/CachedFieldConfigHelper.java
index 48f3c4b7e38..ec3e602e717 100644
--- a/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/CachedFieldConfigHelper.java
+++ b/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/CachedFieldConfigHelper.java
@@ -1,8 +1,7 @@
package datawave.ingest.data.config;
-import java.util.EnumMap;
import java.util.Map;
-import java.util.function.Function;
+import java.util.function.Predicate;
import org.apache.commons.collections4.map.LRUMap;
@@ -10,10 +9,10 @@
public class CachedFieldConfigHelper implements FieldConfigHelper {
private final FieldConfigHelper underlyingHelper;
- private final Map resultCache;
+ private final Map resultCache;
enum AttributeType {
- INDEXED_FIELD, REVERSE_INDEXED_FIELD, TOKENIZED_FIELD, REVERSE_TOKENIZED_FIELD, STORED_FIELD, INDEXED_ONLY
+ INDEXED_FIELD, REVERSE_INDEXED_FIELD, TOKENIZED_FIELD, REVERSE_TOKENIZED_FIELD, STORED_FIELD, INDEX_ONLY_FIELD
}
public CachedFieldConfigHelper(FieldConfigHelper helper, int limit) {
@@ -26,50 +25,96 @@ public CachedFieldConfigHelper(FieldConfigHelper helper, int limit) {
@Override
public boolean isStoredField(String fieldName) {
- return getOrEvaluate(AttributeType.STORED_FIELD, fieldName, underlyingHelper::isStoredField);
+ return getFieldResult(AttributeType.STORED_FIELD, fieldName, underlyingHelper::isStoredField);
}
@Override
public boolean isIndexedField(String fieldName) {
- return getOrEvaluate(AttributeType.INDEXED_FIELD, fieldName, underlyingHelper::isIndexedField);
+ return getFieldResult(AttributeType.INDEXED_FIELD, fieldName, underlyingHelper::isIndexedField);
}
@Override
public boolean isIndexOnlyField(String fieldName) {
- return getOrEvaluate(AttributeType.INDEXED_ONLY, fieldName, underlyingHelper::isIndexOnlyField);
+ return getFieldResult(AttributeType.INDEX_ONLY_FIELD, fieldName, underlyingHelper::isIndexOnlyField);
}
@Override
public boolean isReverseIndexedField(String fieldName) {
- return getOrEvaluate(AttributeType.REVERSE_INDEXED_FIELD, fieldName, underlyingHelper::isReverseIndexedField);
+ return getFieldResult(AttributeType.REVERSE_INDEXED_FIELD, fieldName, underlyingHelper::isReverseIndexedField);
}
@Override
public boolean isTokenizedField(String fieldName) {
- return getOrEvaluate(AttributeType.TOKENIZED_FIELD, fieldName, underlyingHelper::isTokenizedField);
+ return getFieldResult(AttributeType.TOKENIZED_FIELD, fieldName, underlyingHelper::isTokenizedField);
}
@Override
public boolean isReverseTokenizedField(String fieldName) {
- return getOrEvaluate(AttributeType.REVERSE_TOKENIZED_FIELD, fieldName, underlyingHelper::isReverseTokenizedField);
+ return getFieldResult(AttributeType.REVERSE_TOKENIZED_FIELD, fieldName, underlyingHelper::isReverseTokenizedField);
}
@VisibleForTesting
- boolean getOrEvaluate(AttributeType attributeType, String fieldName, Function evaluateFn) {
- return resultCache.computeIfAbsent(fieldName, ResultEntry::new).resolveResult(attributeType, evaluateFn);
+ boolean getFieldResult(AttributeType attributeType, String fieldName, Predicate fn) {
+ return resultCache.computeIfAbsent(fieldName, CachedEntry::new).get(attributeType).getResultOrEvaluate(fn);
}
- private static class ResultEntry {
+ private static class CachedEntry {
private final String fieldName;
- private final EnumMap resultMap;
-
- ResultEntry(String fieldName) {
+ private final MemoizedResult indexed;
+ private final MemoizedResult reverseIndexed;
+ private final MemoizedResult stored;
+ private final MemoizedResult indexedOnly;
+ private final MemoizedResult tokenized;
+ private final MemoizedResult reverseTokenized;
+
+ private CachedEntry(String fieldName) {
this.fieldName = fieldName;
- this.resultMap = new EnumMap<>(AttributeType.class);
+ this.indexed = new MemoizedResult();
+ this.reverseIndexed = new MemoizedResult();
+ this.stored = new MemoizedResult();
+ this.indexedOnly = new MemoizedResult();
+ this.tokenized = new MemoizedResult();
+ this.reverseTokenized = new MemoizedResult();
+ }
+
+ private MemoizedResult get(AttributeType attributeType) {
+ MemoizedResult result;
+ switch (attributeType) {
+ case INDEX_ONLY_FIELD:
+ result = indexedOnly;
+ break;
+ case INDEXED_FIELD:
+ result = indexed;
+ break;
+ case REVERSE_INDEXED_FIELD:
+ result = reverseIndexed;
+ break;
+ case TOKENIZED_FIELD:
+ result = tokenized;
+ break;
+ case REVERSE_TOKENIZED_FIELD:
+ result = reverseTokenized;
+ break;
+ case STORED_FIELD:
+ result = stored;
+ break;
+ default:
+ throw new IllegalArgumentException("Undefined attribute type: " + attributeType);
+ }
+ return result;
}
- boolean resolveResult(AttributeType attributeType, Function evaluateFn) {
- return resultMap.computeIfAbsent(attributeType, (t) -> evaluateFn.apply(fieldName));
+ private class MemoizedResult {
+ private boolean resultEvaluated;
+ private boolean result;
+
+ private boolean getResultOrEvaluate(Predicate evaluateFn) {
+ if (!resultEvaluated) {
+ result = evaluateFn.test(fieldName);
+ resultEvaluated = true;
+ }
+ return result;
+ }
}
}
}
diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/CachingFieldConfigHelperTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/CachingFieldConfigHelperTest.java
index f1d0deaa8fd..b973dea7b24 100644
--- a/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/CachingFieldConfigHelperTest.java
+++ b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/CachingFieldConfigHelperTest.java
@@ -1,16 +1,18 @@
package datawave.ingest.data.config;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Function;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
public class CachingFieldConfigHelperTest {
@@ -45,14 +47,32 @@ public void testConstructorWithNonPositiveLimitWillThrow(int limit) {
assertThrows(IllegalArgumentException.class, () -> new CachedFieldConfigHelper(mock(FieldConfigHelper.class), limit));
}
+ @SuppressWarnings("ClassEscapesDefinedScope")
+ @ParameterizedTest
+ @EnumSource(CachedFieldConfigHelper.AttributeType.class)
+ public void testAttributeTypesDoNotThrow(CachedFieldConfigHelper.AttributeType attributeType) {
+ String fieldName = "test";
+ FieldConfigHelper mockHelper = mock(FieldConfigHelper.class);
+ CachedFieldConfigHelper cachedHelper = new CachedFieldConfigHelper(mockHelper, 1);
+ cachedHelper.getFieldResult(attributeType, fieldName, (f) -> true);
+ }
+
@Test
public void testCachingLimitsBetweenFieldsAndAttributeTypes() {
- AtomicLong counter = new AtomicLong();
- CachedFieldConfigHelper helper = new CachedFieldConfigHelper(mock(FieldConfigHelper.class), 2);
- Function fn = (f) -> {
- counter.incrementAndGet();
+ AtomicLong storedCounter = new AtomicLong();
+ AtomicLong indexCounter = new AtomicLong();
+ FieldConfigHelper innerHelper = mock(FieldConfigHelper.class);
+ CachedFieldConfigHelper helper = new CachedFieldConfigHelper(innerHelper, 2);
+
+ when(innerHelper.isStoredField(any())).then((a) -> {
+ storedCounter.incrementAndGet();
+ return true;
+ });
+
+ when(innerHelper.isIndexedField(any())).then((a) -> {
+ indexCounter.incrementAndGet();
return true;
- };
+ });
// following ensures that:
// 1. fields are computed, where appropriate per attribute-type
@@ -60,30 +80,30 @@ public void testCachingLimitsBetweenFieldsAndAttributeTypes() {
// 3. limit blocks results to return if exceeded
// 4. limit functions across attribute-types
- helper.getOrEvaluate(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field1", fn);
- Assertions.assertEquals(1, counter.get(), "field1 should compute result (new field)");
+ helper.getFieldResult(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field1", innerHelper::isStoredField);
+ assertEquals(1, storedCounter.get(), "field1 should compute result (new field)");
- helper.getOrEvaluate(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field1", fn);
- Assertions.assertEquals(1, counter.get(), "field1 repeated (existing field)");
+ helper.getFieldResult(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field1", innerHelper::isStoredField);
+ assertEquals(1, storedCounter.get(), "field1 repeated (existing field)");
- helper.getOrEvaluate(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field2", fn);
- Assertions.assertEquals(2, counter.get(), "field2 should compute result (new field)");
+ helper.getFieldResult(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field2", innerHelper::isStoredField);
+ assertEquals(2, storedCounter.get(), "field2 should compute result (new field)");
- helper.getOrEvaluate(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field2", fn);
- Assertions.assertEquals(2, counter.get(), "field2 repeated (existing)");
+ helper.getFieldResult(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field2", innerHelper::isStoredField);
+ assertEquals(2, storedCounter.get(), "field2 repeated (existing)");
- helper.getOrEvaluate(CachedFieldConfigHelper.AttributeType.INDEXED_FIELD, "field1", fn);
- Assertions.assertEquals(3, counter.get(), "field1 should compute result (new attribute)");
+ helper.getFieldResult(CachedFieldConfigHelper.AttributeType.INDEXED_FIELD, "field1", innerHelper::isIndexedField);
+ assertEquals(1, indexCounter.get(), "field1 should compute result (new attribute)");
- helper.getOrEvaluate(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field3", fn);
- Assertions.assertEquals(4, counter.get(), "field3 exceeded limit (new field)");
+ helper.getFieldResult(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field3", innerHelper::isStoredField);
+ assertEquals(3, storedCounter.get(), "field3 exceeded limit (new field)");
- helper.getOrEvaluate(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field3", fn);
- Assertions.assertEquals(4, counter.get(), "field3 exceeded limit (existing field)");
+ helper.getFieldResult(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field3", innerHelper::isStoredField);
+ assertEquals(3, storedCounter.get(), "field3 exceeded limit (existing field)");
// LRU map should evict field #2
// we access field #1 above which has more accesses over field #2
- helper.getOrEvaluate(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field2", fn);
- Assertions.assertEquals(5, counter.get(), "field1 exceeded limit (new field/eviction)");
+ helper.getFieldResult(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field2", innerHelper::isStoredField);
+ assertEquals(4, storedCounter.get(), "field1 exceeded limit (new field/eviction)");
}
}
From 0c4d1685b6c6b8727cb5c4002557ffc0ea7a1365 Mon Sep 17 00:00:00 2001
From: dtspence <33552925+dtspence@users.noreply.github.com>
Date: Fri, 1 Nov 2024 21:48:58 +0000
Subject: [PATCH 4/8] Use LinkedHashMap as LRU implementation
---
.../data/config/CachedFieldConfigHelper.java | 21 ++++++++++++++-----
1 file changed, 16 insertions(+), 5 deletions(-)
diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/CachedFieldConfigHelper.java b/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/CachedFieldConfigHelper.java
index ec3e602e717..7f606cf20c5 100644
--- a/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/CachedFieldConfigHelper.java
+++ b/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/CachedFieldConfigHelper.java
@@ -1,13 +1,14 @@
package datawave.ingest.data.config;
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.LinkedHashMap;
import java.util.Map;
import java.util.function.Predicate;
-import org.apache.commons.collections4.map.LRUMap;
-
-import com.google.common.annotations.VisibleForTesting;
-
public class CachedFieldConfigHelper implements FieldConfigHelper {
+ private final static float DEFAULT_LRU_LF = 0.75f;
+
private final FieldConfigHelper underlyingHelper;
private final Map resultCache;
@@ -20,7 +21,7 @@ public CachedFieldConfigHelper(FieldConfigHelper helper, int limit) {
throw new IllegalArgumentException("Limit must be a positive integer");
}
this.underlyingHelper = helper;
- this.resultCache = new LRUMap<>(limit);
+ this.resultCache = lruCache(limit);
}
@Override
@@ -58,6 +59,16 @@ boolean getFieldResult(AttributeType attributeType, String fieldName, Predicate<
return resultCache.computeIfAbsent(fieldName, CachedEntry::new).get(attributeType).getResultOrEvaluate(fn);
}
+ static Map lruCache(final int maxSize) {
+ // Testing showed slightly better or same performance of LRU implementation below
+ // when compared to Apache Commons LRUMap
+ return new LinkedHashMap<>((int) (maxSize / DEFAULT_LRU_LF) + 1, DEFAULT_LRU_LF, true) {
+ protected boolean removeEldestEntry(Map.Entry eldest) {
+ return size() > maxSize;
+ }
+ };
+ }
+
private static class CachedEntry {
private final String fieldName;
private final MemoizedResult indexed;
From 01f12a320697a12c85b7a089e88491a8a015601d Mon Sep 17 00:00:00 2001
From: dtspence <33552925+dtspence@users.noreply.github.com>
Date: Mon, 4 Nov 2024 21:21:04 +0000
Subject: [PATCH 5/8] Rename unit test for main class name
---
...ldConfigHelperTest.java => CachedFieldConfigHelperTest.java} | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
rename warehouse/ingest-core/src/test/java/datawave/ingest/data/config/{CachingFieldConfigHelperTest.java => CachedFieldConfigHelperTest.java} (99%)
diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/CachingFieldConfigHelperTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/CachedFieldConfigHelperTest.java
similarity index 99%
rename from warehouse/ingest-core/src/test/java/datawave/ingest/data/config/CachingFieldConfigHelperTest.java
rename to warehouse/ingest-core/src/test/java/datawave/ingest/data/config/CachedFieldConfigHelperTest.java
index b973dea7b24..18837a7c1c2 100644
--- a/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/CachingFieldConfigHelperTest.java
+++ b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/CachedFieldConfigHelperTest.java
@@ -15,7 +15,7 @@
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
-public class CachingFieldConfigHelperTest {
+public class CachedFieldConfigHelperTest {
@Test
public void testCachingBehaviorWillCallBaseMethods() {
String fieldName = "test";
From 09264921671c78f7cf02425a97aa084c2b080035 Mon Sep 17 00:00:00 2001
From: dtspence <33552925+dtspence@users.noreply.github.com>
Date: Thu, 7 Nov 2024 20:57:30 +0000
Subject: [PATCH 6/8] Logging to show if field limit has been exceeded for
debugging
---
.../data/config/CachedFieldConfigHelper.java | 31 +++++++++++++++++--
.../data/config/ingest/BaseIngestHelper.java | 16 +++++-----
.../config/CachedFieldConfigHelperTest.java | 7 ++++-
3 files changed, 44 insertions(+), 10 deletions(-)
diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/CachedFieldConfigHelper.java b/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/CachedFieldConfigHelper.java
index 7f606cf20c5..d72408ce789 100644
--- a/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/CachedFieldConfigHelper.java
+++ b/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/CachedFieldConfigHelper.java
@@ -1,27 +1,49 @@
package datawave.ingest.data.config;
import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.function.Function;
import java.util.function.Predicate;
public class CachedFieldConfigHelper implements FieldConfigHelper {
+ private final static Logger log = LoggerFactory.getLogger(CachedFieldConfigHelper.class);
+
private final static float DEFAULT_LRU_LF = 0.75f;
+ private final static int EMIT_OVER_LIMIT_THRESHOLD = 100;
private final FieldConfigHelper underlyingHelper;
private final Map resultCache;
+ private final Function resultEntryFn;
+
+ private long fieldComputes;
+ private boolean fieldLimitExceeded;
enum AttributeType {
INDEXED_FIELD, REVERSE_INDEXED_FIELD, TOKENIZED_FIELD, REVERSE_TOKENIZED_FIELD, STORED_FIELD, INDEX_ONLY_FIELD
}
public CachedFieldConfigHelper(FieldConfigHelper helper, int limit) {
+ this(helper, limit, false);
+ }
+
+ public CachedFieldConfigHelper(FieldConfigHelper helper, int limit, boolean debugLimitExceeded) {
if (limit < 1) {
throw new IllegalArgumentException("Limit must be a positive integer");
}
this.underlyingHelper = helper;
this.resultCache = lruCache(limit);
+ this.resultEntryFn = !debugLimitExceeded ? CachedEntry::new : (String f) -> {
+ fieldComputes++;
+ if (fieldComputes >= limit && ((fieldComputes == limit) || (fieldComputes % EMIT_OVER_LIMIT_THRESHOLD) == 0)) {
+ fieldLimitExceeded = true;
+ log.info("Field cache limit exceeded [val: {}, size={}, limit={}]", f, fieldComputes, limit);
+ }
+ return new CachedEntry(f);
+ };
}
@Override
@@ -56,10 +78,15 @@ public boolean isReverseTokenizedField(String fieldName) {
@VisibleForTesting
boolean getFieldResult(AttributeType attributeType, String fieldName, Predicate fn) {
- return resultCache.computeIfAbsent(fieldName, CachedEntry::new).get(attributeType).getResultOrEvaluate(fn);
+ return resultCache.computeIfAbsent(fieldName, resultEntryFn).get(attributeType).getResultOrEvaluate(fn);
+ }
+
+ @VisibleForTesting
+ boolean hasLimitExceeded() {
+ return fieldLimitExceeded;
}
- static Map lruCache(final int maxSize) {
+ private static Map lruCache(final int maxSize) {
// Testing showed slightly better or same performance of LRU implementation below
// when compared to Apache Commons LRUMap
return new LinkedHashMap<>((int) (maxSize / DEFAULT_LRU_LF) + 1, DEFAULT_LRU_LF, true) {
diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/ingest/BaseIngestHelper.java b/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/ingest/BaseIngestHelper.java
index 767566abb7b..7349740b0ba 100644
--- a/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/ingest/BaseIngestHelper.java
+++ b/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/ingest/BaseIngestHelper.java
@@ -141,12 +141,10 @@ public abstract class BaseIngestHelper extends AbstractIngestHelper implements C
public static final String FIELD_CONFIG_FILE = ".data.category.field.config.file";
public static final String FIELD_CONFIG_CACHE_ENABLED = ".data.category.field.config.cache.enabled";
public static final String FIELD_CONFIG_CACHE_KEY_LIMIT = ".data.category.field.config.cache.limit";
+ public static final String FIELD_CONFIG_CACHE_KEY_LIMIT_DEBUG = ".data.category.field.config.cache.limit.debug";
private static final Logger log = ThreadConfigurableLogger.getLogger(BaseIngestHelper.class);
- private static final boolean DEFAULT_FIELD_CACHE_ENABLED = false;
- private static final int DEFAULT_FIELD_CACHE_LIMIT = 100;
-
private Multimap> typeFieldMap = null;
private Multimap> typePatternMap = null;
private TreeMultimap> typeCompiledPatternMap = null;
@@ -261,17 +259,21 @@ public void setup(Configuration config) {
// Load the field helper, which takes precedence over the individual field configurations
final String fieldConfigFile = config.get(this.getType().typeName() + FIELD_CONFIG_FILE);
if (fieldConfigFile != null) {
- final boolean fieldConfigCacheEnabled = config.getBoolean(this.getType().typeName() + FIELD_CONFIG_CACHE_ENABLED, DEFAULT_FIELD_CACHE_ENABLED);
- final int fieldConfigCacheLimit = config.getInt(this.getType().typeName() + FIELD_CONFIG_CACHE_KEY_LIMIT, DEFAULT_FIELD_CACHE_LIMIT);
+ final boolean fieldConfigCacheEnabled = config.getBoolean(this.getType().typeName() + FIELD_CONFIG_CACHE_ENABLED, false);
+ final boolean fieldConfigCacheLimitDebug = config.getBoolean(this.getType().typeName() + FIELD_CONFIG_CACHE_KEY_LIMIT_DEBUG, false);
+ final int fieldConfigCacheLimit = config.getInt(this.getType().typeName() + FIELD_CONFIG_CACHE_KEY_LIMIT, 100);
if (log.isDebugEnabled()) {
log.debug("Field config file " + fieldConfigFile + " specified for: " + this.getType().typeName() + FIELD_CONFIG_FILE);
log.debug("Field config cache enabled: " + fieldConfigCacheEnabled);
if (fieldConfigCacheEnabled) {
log.debug("Field config cache limit: " + fieldConfigCacheLimit);
+ log.debug("Field config cache limit debug: " + fieldConfigCacheLimitDebug);
}
}
- final FieldConfigHelper baseHelper = XMLFieldConfigHelper.load(fieldConfigFile, this);
- fieldConfigHelper = fieldConfigCacheEnabled ? new CachedFieldConfigHelper(baseHelper, fieldConfigCacheLimit) : baseHelper;
+ fieldConfigHelper = XMLFieldConfigHelper.load(fieldConfigFile, this);
+ if (fieldConfigCacheEnabled) {
+ fieldConfigHelper = new CachedFieldConfigHelper(fieldConfigHelper, fieldConfigCacheLimit, fieldConfigCacheLimitDebug);
+ }
}
// Process the indexed fields
diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/CachedFieldConfigHelperTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/CachedFieldConfigHelperTest.java
index 18837a7c1c2..3c878c350f7 100644
--- a/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/CachedFieldConfigHelperTest.java
+++ b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/CachedFieldConfigHelperTest.java
@@ -1,7 +1,9 @@
package datawave.ingest.data.config;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
@@ -62,7 +64,7 @@ public void testCachingLimitsBetweenFieldsAndAttributeTypes() {
AtomicLong storedCounter = new AtomicLong();
AtomicLong indexCounter = new AtomicLong();
FieldConfigHelper innerHelper = mock(FieldConfigHelper.class);
- CachedFieldConfigHelper helper = new CachedFieldConfigHelper(innerHelper, 2);
+ CachedFieldConfigHelper helper = new CachedFieldConfigHelper(innerHelper, 2, true);
when(innerHelper.isStoredField(any())).then((a) -> {
storedCounter.incrementAndGet();
@@ -82,12 +84,15 @@ public void testCachingLimitsBetweenFieldsAndAttributeTypes() {
helper.getFieldResult(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field1", innerHelper::isStoredField);
assertEquals(1, storedCounter.get(), "field1 should compute result (new field)");
+ assertFalse(helper.hasLimitExceeded());
helper.getFieldResult(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field1", innerHelper::isStoredField);
assertEquals(1, storedCounter.get(), "field1 repeated (existing field)");
+ assertFalse(helper.hasLimitExceeded());
helper.getFieldResult(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field2", innerHelper::isStoredField);
assertEquals(2, storedCounter.get(), "field2 should compute result (new field)");
+ assertTrue(helper.hasLimitExceeded());
helper.getFieldResult(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field2", innerHelper::isStoredField);
assertEquals(2, storedCounter.get(), "field2 repeated (existing)");
From 3d3c425f0a7322b6e801933cc2eb6441331e6ba0 Mon Sep 17 00:00:00 2001
From: dtspence <33552925+dtspence@users.noreply.github.com>
Date: Tue, 10 Dec 2024 22:58:46 +0000
Subject: [PATCH 7/8] Refactoring and logging debug to run as thread
---
.../data/config/CachedFieldConfigHelper.java | 108 +++++++++++++-----
.../config/CachedFieldConfigHelperTest.java | 5 +-
2 files changed, 83 insertions(+), 30 deletions(-)
diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/CachedFieldConfigHelper.java b/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/CachedFieldConfigHelper.java
index d72408ce789..e9cb340aba2 100644
--- a/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/CachedFieldConfigHelper.java
+++ b/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/CachedFieldConfigHelper.java
@@ -1,26 +1,36 @@
package datawave.ingest.data.config;
-import com.google.common.annotations.VisibleForTesting;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static java.lang.Thread.NORM_PRIORITY;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
-import java.util.function.Function;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
public class CachedFieldConfigHelper implements FieldConfigHelper {
private final static Logger log = LoggerFactory.getLogger(CachedFieldConfigHelper.class);
private final static float DEFAULT_LRU_LF = 0.75f;
- private final static int EMIT_OVER_LIMIT_THRESHOLD = 100;
+ private final static int DEFAULT_DEBUG_STATE_SECS = 30;
private final FieldConfigHelper underlyingHelper;
- private final Map resultCache;
- private final Function resultEntryFn;
-
- private long fieldComputes;
- private boolean fieldLimitExceeded;
+ private final LruCache resultCache;
+ private final boolean debugLimitsEnabled;
+ private final int limit;
+ private final Set debugFieldUnique;
+ private final ScheduledExecutorService debugStateExecutor;
+ private final AtomicLong debugFieldComputes;
enum AttributeType {
INDEXED_FIELD, REVERSE_INDEXED_FIELD, TOKENIZED_FIELD, REVERSE_TOKENIZED_FIELD, STORED_FIELD, INDEX_ONLY_FIELD
@@ -30,20 +40,31 @@ public CachedFieldConfigHelper(FieldConfigHelper helper, int limit) {
this(helper, limit, false);
}
- public CachedFieldConfigHelper(FieldConfigHelper helper, int limit, boolean debugLimitExceeded) {
+ public CachedFieldConfigHelper(FieldConfigHelper helper, int limit, boolean debugLimitEnabled) {
if (limit < 1) {
throw new IllegalArgumentException("Limit must be a positive integer");
}
this.underlyingHelper = helper;
- this.resultCache = lruCache(limit);
- this.resultEntryFn = !debugLimitExceeded ? CachedEntry::new : (String f) -> {
- fieldComputes++;
- if (fieldComputes >= limit && ((fieldComputes == limit) || (fieldComputes % EMIT_OVER_LIMIT_THRESHOLD) == 0)) {
- fieldLimitExceeded = true;
- log.info("Field cache limit exceeded [val: {}, size={}, limit={}]", f, fieldComputes, limit);
- }
- return new CachedEntry(f);
- };
+ this.resultCache = new LruCache<>(limit);
+ this.limit = limit;
+ this.debugLimitsEnabled = debugLimitEnabled;
+ this.debugFieldUnique = new HashSet<>();
+ this.debugFieldComputes = new AtomicLong();
+
+ if (debugLimitEnabled) {
+ this.debugStateExecutor = Executors.newSingleThreadScheduledExecutor(
+ // @formatter:off
+ new ThreadFactoryBuilder()
+ .setPriority(NORM_PRIORITY)
+ .setDaemon(true)
+ .setNameFormat("CachedFieldConfigHelper.DebugState")
+ .build()
+ // formatter:off
+ );
+ this.debugStateExecutor.scheduleAtFixedRate(this::debugLogState, DEFAULT_DEBUG_STATE_SECS, DEFAULT_DEBUG_STATE_SECS, SECONDS);
+ } else {
+ this.debugStateExecutor = null;
+ }
}
@Override
@@ -78,22 +99,51 @@ public boolean isReverseTokenizedField(String fieldName) {
@VisibleForTesting
boolean getFieldResult(AttributeType attributeType, String fieldName, Predicate fn) {
- return resultCache.computeIfAbsent(fieldName, resultEntryFn).get(attributeType).getResultOrEvaluate(fn);
+ CachedEntry ce = !debugLimitsEnabled ?
+ resultCache.computeIfAbsent(fieldName, CachedEntry::new) :
+ resultCache.computeIfAbsent(fieldName, this::debugCachedEntryCreation);
+ return ce.get(attributeType).getResultOrEvaluate(fn);
}
@VisibleForTesting
boolean hasLimitExceeded() {
- return fieldLimitExceeded;
+ return resultCache.hasLimitExceeded();
}
- private static Map lruCache(final int maxSize) {
- // Testing showed slightly better or same performance of LRU implementation below
- // when compared to Apache Commons LRUMap
- return new LinkedHashMap<>((int) (maxSize / DEFAULT_LRU_LF) + 1, DEFAULT_LRU_LF, true) {
- protected boolean removeEldestEntry(Map.Entry eldest) {
- return size() > maxSize;
+ private CachedEntry debugCachedEntryCreation(String fieldName) {
+ debugFieldComputes.incrementAndGet();
+ debugFieldUnique.add(fieldName);
+ return new CachedEntry(fieldName);
+ }
+
+ private void debugLogState() {
+ if (resultCache.hasLimitExceeded()) {
+ log.info("Field cache LRU limit exceeded [limit={}, debug={}, size={}, uniq={}]",
+ limit, debugFieldComputes.get(), debugFieldUnique.size(), debugLimitsEnabled);
+ }
+ }
+
+ private static class LruCache extends LinkedHashMap {
+ private final int maxSize;
+ private volatile boolean limitExceeded;
+
+ LruCache(int maxSize) {
+ super((int)(maxSize / DEFAULT_LRU_LF) + 1, DEFAULT_LRU_LF, true);
+ this.maxSize = maxSize;
+ }
+
+ boolean hasLimitExceeded() {
+ // thread-safe
+ return limitExceeded;
+ }
+
+ protected boolean removeEldestEntry(Map.Entry eldest) {
+ boolean localLimitExceeded = size() > maxSize;
+ if (localLimitExceeded) {
+ limitExceeded = true;
}
- };
+ return localLimitExceeded;
+ }
}
private static class CachedEntry {
diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/CachedFieldConfigHelperTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/CachedFieldConfigHelperTest.java
index 3c878c350f7..e040edccd95 100644
--- a/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/CachedFieldConfigHelperTest.java
+++ b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/CachedFieldConfigHelperTest.java
@@ -92,16 +92,19 @@ public void testCachingLimitsBetweenFieldsAndAttributeTypes() {
helper.getFieldResult(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field2", innerHelper::isStoredField);
assertEquals(2, storedCounter.get(), "field2 should compute result (new field)");
- assertTrue(helper.hasLimitExceeded());
+ assertFalse(helper.hasLimitExceeded());
helper.getFieldResult(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field2", innerHelper::isStoredField);
assertEquals(2, storedCounter.get(), "field2 repeated (existing)");
+ assertFalse(helper.hasLimitExceeded());
helper.getFieldResult(CachedFieldConfigHelper.AttributeType.INDEXED_FIELD, "field1", innerHelper::isIndexedField);
assertEquals(1, indexCounter.get(), "field1 should compute result (new attribute)");
+ assertFalse(helper.hasLimitExceeded());
helper.getFieldResult(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field3", innerHelper::isStoredField);
assertEquals(3, storedCounter.get(), "field3 exceeded limit (new field)");
+ assertTrue(helper.hasLimitExceeded());
helper.getFieldResult(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field3", innerHelper::isStoredField);
assertEquals(3, storedCounter.get(), "field3 exceeded limit (existing field)");
From c999a7871fb6f5a344e122d98c2f071c1199234b Mon Sep 17 00:00:00 2001
From: dtspence <33552925+dtspence@users.noreply.github.com>
Date: Wed, 8 Jan 2025 14:29:44 +0000
Subject: [PATCH 8/8] Switch cache helper logs to info during setup
---
.../ingest/data/config/ingest/BaseIngestHelper.java | 12 +++++-------
1 file changed, 5 insertions(+), 7 deletions(-)
diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/ingest/BaseIngestHelper.java b/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/ingest/BaseIngestHelper.java
index 7349740b0ba..df2a069df05 100644
--- a/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/ingest/BaseIngestHelper.java
+++ b/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/ingest/BaseIngestHelper.java
@@ -262,13 +262,11 @@ public void setup(Configuration config) {
final boolean fieldConfigCacheEnabled = config.getBoolean(this.getType().typeName() + FIELD_CONFIG_CACHE_ENABLED, false);
final boolean fieldConfigCacheLimitDebug = config.getBoolean(this.getType().typeName() + FIELD_CONFIG_CACHE_KEY_LIMIT_DEBUG, false);
final int fieldConfigCacheLimit = config.getInt(this.getType().typeName() + FIELD_CONFIG_CACHE_KEY_LIMIT, 100);
- if (log.isDebugEnabled()) {
- log.debug("Field config file " + fieldConfigFile + " specified for: " + this.getType().typeName() + FIELD_CONFIG_FILE);
- log.debug("Field config cache enabled: " + fieldConfigCacheEnabled);
- if (fieldConfigCacheEnabled) {
- log.debug("Field config cache limit: " + fieldConfigCacheLimit);
- log.debug("Field config cache limit debug: " + fieldConfigCacheLimitDebug);
- }
+ log.info("Field config file " + fieldConfigFile + " specified for: " + this.getType().typeName() + FIELD_CONFIG_FILE);
+ log.info("Field config cache enabled: " + fieldConfigCacheEnabled);
+ if (fieldConfigCacheEnabled) {
+ log.info("Field config cache limit: " + fieldConfigCacheLimit);
+ log.info("Field config cache limit debug: " + fieldConfigCacheLimitDebug);
}
fieldConfigHelper = XMLFieldConfigHelper.load(fieldConfigFile, this);
if (fieldConfigCacheEnabled) {