Skip to content

Commit

Permalink
#4142 Index shards now cope with field changes
Browse files Browse the repository at this point in the history
  • Loading branch information
stroomdev66 committed Mar 26, 2024
1 parent 438c0f8 commit 33a49c2
Show file tree
Hide file tree
Showing 51 changed files with 785 additions and 540 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class TestIndexShardSearcher extends AbstractCoreIntegrationTest {
// // Create an index
// final DocRef indexRef = commonTestScenarioCreator.createIndex("TEST_2010a");
// final IndexDoc index = indexStore.readDocument(indexRef);
// final IndexShardKey indexShardKey = IndexShardKeyUtil.createTestKey(index);
// final IndexShardKey indexShardKey = IndexShardKey.createTestKey(index);
//
// // Create and close writer.
// final IndexShardWriter writer = indexShardWriterCache.getWriterByShardKey(indexShardKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@ void testSelectIndexVolume_validGroup() {
// index1.setVolumeGroupName(volumeGroup);
// indexStore.writeDocument(index1);
//
// final IndexShardKey indexShardKey1 = IndexShardKeyUtil.createTestKey(index1);
// final IndexShardKey indexShardKey1 = IndexShardKey.createTestKey(index1);
//
// final DocRef indexRef2 = indexStore.createDocument("Test Index 2");
// final IndexDoc index2 = indexStore.readDocument(indexRef2);
// index2.setVolumeGroupName(volumeGroup);
// indexStore.writeDocument(index2);
// final IndexShardKey indexShardKey2 = IndexShardKeyUtil.createTestKey(index2);
// final IndexShardKey indexShardKey2 = IndexShardKey.createTestKey(index2);
//
// final String nodeName = nodeInfo.getThisNodeName();
//
Expand Down Expand Up @@ -193,7 +193,7 @@ void testSelectIndexVolume_validGroup() {
// final String dateTime,
// final int shardNo) {
// final long timeMs = DateUtil.parseNormalDateTimeString(dateTime);
// final IndexShardKey key = IndexShardKeyUtil.createTimeBasedKey(index, timeMs, shardNo);
// final IndexShardKey key = IndexShardKey.createTimeBasedKey(index, timeMs, shardNo);
// indexShardService.createIndexShard(key, nodeName);
// }
}
99 changes: 85 additions & 14 deletions stroom-app/src/test/java/stroom/index/TestIndexShardWriterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

package stroom.index;

import stroom.datasource.api.v2.AnalyzerType;
import stroom.datasource.api.v2.FieldType;
import stroom.docref.DocRef;
import stroom.entity.shared.ExpressionCriteria;
import stroom.index.impl.IndexDocument;
import stroom.index.impl.IndexShardKeyUtil;
import stroom.index.impl.IndexShardManager;
import stroom.index.impl.IndexShardManager.IndexShardAction;
import stroom.index.impl.IndexShardService;
Expand All @@ -37,12 +38,15 @@
import stroom.index.shared.IndexVolume;
import stroom.index.shared.LuceneIndexDoc;
import stroom.index.shared.LuceneIndexField;
import stroom.node.api.NodeInfo;
import stroom.query.language.functions.ValInteger;
import stroom.query.language.functions.ValString;
import stroom.search.extraction.FieldValue;
import stroom.test.AbstractCoreIntegrationTest;
import stroom.test.CommonTestControl;
import stroom.test.CommonTestScenarioCreator;
import stroom.util.shared.ResultPage;
import stroom.util.shared.Selection;

import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -75,6 +79,8 @@ class TestIndexShardWriterImpl extends AbstractCoreIntegrationTest {
private IndexStore indexStore;
@Inject
private IndexVolumeDao indexVolumeDao;
@Inject
private NodeInfo nodeInfo;

@BeforeEach
void onBefore() {
Expand All @@ -92,10 +98,11 @@ void testSingle() {
// Create an index
final DocRef indexRef1 = commonTestScenarioCreator.createIndex("TEST_2010a");
final LuceneIndexDoc index1 = indexStore.readDocument(indexRef1);
final IndexShardKey indexShardKey1 = IndexShardKeyUtil.createTestKey(index1);
final IndexShardKey indexShardKey1 = IndexShardKey.createKey(index1);
final IndexShard indexShard = indexShardService.createIndexShard(indexShardKey1, nodeInfo.getThisNodeName());

// Create a writer in the pool
final IndexShardWriter writer1 = indexShardWriterCache.getWriterByShardKey(indexShardKey1);
final IndexShardWriter writer1 = indexShardWriterCache.getWriter(indexShard.getId());

// Assert that there is 1 writer in the pool.
assertThat(indexShardService.find(FindIndexShardCriteria.matchAll()).size()).isEqualTo(1);
Expand Down Expand Up @@ -138,15 +145,17 @@ void testSimple() {

final DocRef indexRef1 = commonTestScenarioCreator.createIndex("TEST_2010");
final LuceneIndexDoc index1 = indexStore.readDocument(indexRef1);
final IndexShardKey indexShardKey1 = IndexShardKeyUtil.createTestKey(index1);
final IndexShardKey indexShardKey1 = IndexShardKey.createKey(index1);
final IndexShard indexShard1 = indexShardService.createIndexShard(indexShardKey1, nodeInfo.getThisNodeName());

final DocRef indexRef2 = commonTestScenarioCreator.createIndex("TEST_2011");
final LuceneIndexDoc index2 = indexStore.readDocument(indexRef2);
final IndexShardKey indexShardKey2 = IndexShardKeyUtil.createTestKey(index2);
final IndexShardKey indexShardKey2 = IndexShardKey.createKey(index2);
final IndexShard indexShard2 = indexShardService.createIndexShard(indexShardKey2, nodeInfo.getThisNodeName());

// Create 2 writers in the pool.
final IndexShardWriter writer1 = indexShardWriterCache.getWriterByShardKey(indexShardKey1);
final IndexShardWriter writer2 = indexShardWriterCache.getWriterByShardKey(indexShardKey2);
final IndexShardWriter writer1 = indexShardWriterCache.getWriter(indexShard1.getId());
final IndexShardWriter writer2 = indexShardWriterCache.getWriter(indexShard2.getId());

// Assert that there are 2 writers in the pool.
assertThat(indexShardService.find(FindIndexShardCriteria.matchAll()).size()).isEqualTo(2);
Expand Down Expand Up @@ -208,15 +217,17 @@ void testSimpleRoll() {
commonTestScenarioCreator.createIndexFields(),
10);
final LuceneIndexDoc index1 = indexStore.readDocument(indexRef1);
final IndexShardKey indexShardKey1 = IndexShardKeyUtil.createTestKey(index1);

final IndexShardWriter writer1 = indexShardWriterCache.getWriterByShardKey(indexShardKey1);
final IndexShardKey indexShardKey1 = IndexShardKey.createKey(index1);

for (int i = 0; i < 10; i++) {
assertThat(indexShardWriterCache.getWriterByShardKey(indexShardKey1)).isEqualTo(writer1);
indexer.addDocument(indexShardKey1, document);
}

final ResultPage<IndexShard> indexShardResultPage = indexShardService.find(FindIndexShardCriteria.matchAll());
assertThat(indexShardResultPage.size()).isOne();
final IndexShard indexShard1 = indexShardResultPage.getFirst();
final IndexShardWriter writer1 = indexShardWriterCache.getWriter(indexShard1.getId());

// Make sure the writer is full.
assertThatThrownBy(() -> writer1.addDocument(document)).isInstanceOf(IndexException.class);

Expand All @@ -227,13 +238,13 @@ void testSimpleRoll() {
indexer.addDocument(indexShardKey1, document);

// Get the new writer.
final IndexShardWriter writer2 = indexShardWriterCache.getWriterByShardKey(indexShardKey1);
final IndexShardWriter writer2 = indexShardWriterCache.getWriter(indexShard1.getId());

// Make sure the writers are not the same.
assertThat(writer2).isNotEqualTo(writer1);

for (int i = 1; i < 10; i++) {
assertThat(indexShardWriterCache.getWriterByShardKey(indexShardKey1)).isEqualTo(writer2);
assertThat(indexShardWriterCache.getWriter(indexShard1.getId())).isEqualTo(writer2);
indexer.addDocument(indexShardKey1, document);
}

Expand Down Expand Up @@ -265,13 +276,73 @@ void testFileSystemError() throws IOException {
commonTestScenarioCreator.createIndexFields(),
10);
final LuceneIndexDoc index1 = indexStore.readDocument(indexRef1);
final IndexShardKey indexShardKey1 = IndexShardKeyUtil.createTestKey(index1);
final IndexShardKey indexShardKey1 = IndexShardKey.createKey(index1);

assertThatThrownBy(() -> indexer.addDocument(indexShardKey1, document))
.isInstanceOf(RuntimeException.class)
.hasMessageContaining("Index volume path not found");
}

@Test
void testChangeFieldType() {
assertThat(indexShardService.find(FindIndexShardCriteria.matchAll()).size()).isZero();

final DocRef indexRef1 = commonTestScenarioCreator.createIndex("TEST_2010",
commonTestScenarioCreator.createIndexFields(),
10);
final LuceneIndexDoc index1 = indexStore.readDocument(indexRef1);
final IndexShardKey indexShardKey1 = IndexShardKey.createKey(index1);

final Selection<IndexShardStatus> statusSelection = Selection.selectNone();
statusSelection.addAll(Set.of(IndexShardStatus.CLOSED, IndexShardStatus.OPEN));
final FindIndexShardCriteria criteria = FindIndexShardCriteria
.builder()
.indexShardStatusSet(statusSelection)
.build();

final IndexDocument document1 = new IndexDocument();
document1.add(new FieldValue(LuceneIndexField
.builder()
.fldName("SourcePort")
.fldType(FieldType.TEXT)
.analyzerType(AnalyzerType.ALPHA_NUMERIC)
.termPositions(false)
.indexed(true)
.stored(false)
.build(), ValString.create("12345")));
indexer.addDocument(indexShardKey1, document1);
indexer.addDocument(indexShardKey1, document1);
assertThat(indexShardService.find(criteria).size()).isOne();

final IndexDocument document2 = new IndexDocument();
document2.add(new FieldValue(LuceneIndexField
.builder()
.fldName("SourcePort")
.fldType(FieldType.INTEGER)
.analyzerType(AnalyzerType.KEYWORD)
.termPositions(false)
.indexed(true)
.stored(false)
.build(), ValInteger.create(12345)));
indexer.addDocument(indexShardKey1, document2);
indexer.addDocument(indexShardKey1, document2);
assertThat(indexShardService.find(criteria).size()).isEqualTo(2);

final IndexDocument document3 = new IndexDocument();
document3.add(new FieldValue(LuceneIndexField
.builder()
.fldName("SourcePort")
.fldType(FieldType.TEXT)
.analyzerType(AnalyzerType.KEYWORD)
.termPositions(true)
.indexed(true)
.stored(false)
.build(), ValString.create("12345")));
indexer.addDocument(indexShardKey1, document3);
indexer.addDocument(indexShardKey1, document3);
assertThat(indexShardService.find(criteria).size()).isEqualTo(3);
}

private void checkDocCount(final int expected, final IndexShardWriter indexShardWriter) {
assertThat(indexShardWriter.getDocumentCount()).isEqualTo(expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@
import stroom.docref.DocRef;
import stroom.index.impl.IndexDocument;
import stroom.index.impl.IndexFields;
import stroom.index.impl.IndexShardKeyUtil;
import stroom.index.impl.IndexShardWriter;
import stroom.index.impl.IndexStore;
import stroom.index.mock.MockIndexShardWriter;
import stroom.index.mock.MockIndexShardWriterCache;
import stroom.index.shared.IndexShardKey;
import stroom.index.shared.LuceneIndexDoc;
import stroom.index.shared.LuceneIndexField;
import stroom.pipeline.PipelineStore;
Expand Down Expand Up @@ -248,10 +246,8 @@ private List<IndexDocument> doTest(final String resourceName, final List<LuceneI
}

// Wrote anything ?
final IndexShardKey indexShardKey = IndexShardKeyUtil.createTestKey(index);
if (indexShardWriterCache.getWriters().size() > 0) {
if (!indexShardWriterCache.getWriters().isEmpty()) {
assertThat(indexShardWriterCache.getWriters().size()).isEqualTo(1);
assertThat(indexShardWriterCache.getWriters().containsKey(indexShardKey)).isTrue();

// Get a writer from the pool.
for (final IndexShardWriter writer : indexShardWriterCache.getWriters().values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import stroom.index.impl.IndexStore;
import stroom.index.mock.MockIndexShardWriter;
import stroom.index.mock.MockIndexShardWriterCache;
import stroom.index.shared.IndexShardKey;
import stroom.index.shared.LuceneIndexDoc;
import stroom.index.shared.LuceneIndexField;
import stroom.meta.shared.Meta;
Expand Down Expand Up @@ -152,7 +151,7 @@ void testSimple() {
assertThat(indexShardWriterCache.getWriters().size()).isEqualTo(1);

// Get the writer from the pool.
final Map<IndexShardKey, IndexShardWriter> writers = indexShardWriterCache.getWriters();
final Map<Long, IndexShardWriter> writers = indexShardWriterCache.getWriters();
final MockIndexShardWriter writer = (MockIndexShardWriter) writers.values().iterator().next();

// Check that we indexed 4 documents.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class TestBasicSearch extends AbstractCoreIntegrationTest {
// final String indexName = "TEST";
// final DocRef indexRef = commonTestScenarioCreator.createIndex(indexName, indexFields);
// final IndexDoc index = indexStore.readDocument(indexRef);
// final IndexShardKey indexShardKey = IndexShardKeyUtil.createTestKey(index);
// final IndexShardKey indexShardKey = IndexShardKey.createTestKey(index);
//
// // Do some work.
// for (int i = 1; i <= indexTestSize; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,11 @@ appConfig:
refreshAfterWrite: null
ramBufferSizeMB: 1024
writer:
activeShardCache:
expireAfterAccess: null
expireAfterWrite: "PT1H"
maximumSize: 100
refreshAfterWrite: null
cache:
coreItems: 50
maxItems: 100
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,37 +243,37 @@ stroom.importexport.impl.ExportConfig getExportConfig(
@Generated("stroom.config.global.impl.GenerateConfigProvidersModule")
@Provides
@SuppressWarnings("unused")
stroom.index.impl.IndexCacheConfig getIndexCacheConfig(
stroom.index.impl.IndexConfig getIndexConfig(
final ConfigMapper configMapper) {
return configMapper.getConfigObject(
stroom.index.impl.IndexCacheConfig.class);
stroom.index.impl.IndexConfig.class);
}

@Generated("stroom.config.global.impl.GenerateConfigProvidersModule")
@Provides
@SuppressWarnings("unused")
stroom.index.impl.IndexConfig getIndexConfig(
stroom.index.impl.IndexFieldDbConfig getIndexFieldDbConfig(
final ConfigMapper configMapper) {
return configMapper.getConfigObject(
stroom.index.impl.IndexConfig.class);
stroom.index.impl.IndexFieldDbConfig.class);
}

@Generated("stroom.config.global.impl.GenerateConfigProvidersModule")
@Provides
@SuppressWarnings("unused")
stroom.index.impl.IndexFieldDbConfig getIndexFieldDbConfig(
stroom.index.impl.IndexShardSearchConfig getIndexShardSearchConfig(
final ConfigMapper configMapper) {
return configMapper.getConfigObject(
stroom.index.impl.IndexFieldDbConfig.class);
stroom.index.impl.IndexShardSearchConfig.class);
}

@Generated("stroom.config.global.impl.GenerateConfigProvidersModule")
@Provides
@SuppressWarnings("unused")
stroom.index.impl.IndexShardSearchConfig getIndexShardSearchConfig(
stroom.index.impl.IndexShardWriterCacheConfig getIndexShardWriterCacheConfig(
final ConfigMapper configMapper) {
return configMapper.getConfigObject(
stroom.index.impl.IndexShardSearchConfig.class);
stroom.index.impl.IndexShardWriterCacheConfig.class);
}

@Generated("stroom.config.global.impl.GenerateConfigProvidersModule")
Expand Down
Loading

0 comments on commit 33a49c2

Please sign in to comment.