Skip to content

Commit

Permalink
Merge branch 'gh-4151_index_shard_management' of github.com:gchq/stro…
Browse files Browse the repository at this point in the history
…om into 7.3
  • Loading branch information
stroomdev66 committed Mar 26, 2024
2 parents 34bc1e1 + 3c3a73a commit 947389e
Show file tree
Hide file tree
Showing 59 changed files with 968 additions and 677 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class TestIndexShardSearcher extends AbstractCoreIntegrationTest {
// // Create an index
// final DocRef indexRef = commonTestScenarioCreator.createIndex("TEST_2010a");
// final IndexDoc index = indexStore.readDocument(indexRef);
// final IndexShardKey indexShardKey = IndexShardKeyUtil.createTestKey(index);
// final IndexShardKey indexShardKey = IndexShardKey.createTestKey(index);
//
// // Create and close writer.
// final IndexShardWriter writer = indexShardWriterCache.getWriterByShardKey(indexShardKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@ void testSelectIndexVolume_validGroup() {
// index1.setVolumeGroupName(volumeGroup);
// indexStore.writeDocument(index1);
//
// final IndexShardKey indexShardKey1 = IndexShardKeyUtil.createTestKey(index1);
// final IndexShardKey indexShardKey1 = IndexShardKey.createTestKey(index1);
//
// final DocRef indexRef2 = indexStore.createDocument("Test Index 2");
// final IndexDoc index2 = indexStore.readDocument(indexRef2);
// index2.setVolumeGroupName(volumeGroup);
// indexStore.writeDocument(index2);
// final IndexShardKey indexShardKey2 = IndexShardKeyUtil.createTestKey(index2);
// final IndexShardKey indexShardKey2 = IndexShardKey.createTestKey(index2);
//
// final String nodeName = nodeInfo.getThisNodeName();
//
Expand Down Expand Up @@ -193,7 +193,7 @@ void testSelectIndexVolume_validGroup() {
// final String dateTime,
// final int shardNo) {
// final long timeMs = DateUtil.parseNormalDateTimeString(dateTime);
// final IndexShardKey key = IndexShardKeyUtil.createTimeBasedKey(index, timeMs, shardNo);
// final IndexShardKey key = IndexShardKey.createTimeBasedKey(index, timeMs, shardNo);
// indexShardService.createIndexShard(key, nodeName);
// }
}
196 changes: 166 additions & 30 deletions stroom-app/src/test/java/stroom/index/TestIndexShardWriterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,47 @@

package stroom.index;

import stroom.datasource.api.v2.AnalyzerType;
import stroom.datasource.api.v2.FieldType;
import stroom.docref.DocRef;
import stroom.entity.shared.ExpressionCriteria;
import stroom.index.impl.IndexDocument;
import stroom.index.impl.IndexShardKeyUtil;
import stroom.index.impl.IndexShardManager;
import stroom.index.impl.IndexShardManager.IndexShardAction;
import stroom.index.impl.IndexShardService;
import stroom.index.impl.IndexShardWriter;
import stroom.index.impl.IndexShardWriterCache;
import stroom.index.impl.IndexStore;
import stroom.index.impl.IndexVolumeDao;
import stroom.index.impl.Indexer;
import stroom.index.shared.FindIndexShardCriteria;
import stroom.index.shared.IndexException;
import stroom.index.shared.IndexShard;
import stroom.index.shared.IndexShard.IndexShardStatus;
import stroom.index.shared.IndexShardKey;
import stroom.index.shared.IndexVolume;
import stroom.index.shared.LuceneIndexDoc;
import stroom.index.shared.LuceneIndexField;
import stroom.node.api.NodeInfo;
import stroom.query.language.functions.ValInteger;
import stroom.query.language.functions.ValString;
import stroom.search.extraction.FieldValue;
import stroom.test.AbstractCoreIntegrationTest;
import stroom.test.CommonTestControl;
import stroom.test.CommonTestScenarioCreator;
import stroom.util.shared.ResultPage;
import stroom.util.shared.Selection;

import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.PosixFilePermission;
import java.util.Set;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand All @@ -65,14 +78,18 @@ class TestIndexShardWriterImpl extends AbstractCoreIntegrationTest {
private CommonTestControl commonTestControl;
@Inject
private IndexStore indexStore;
@Inject
private IndexVolumeDao indexVolumeDao;
@Inject
private NodeInfo nodeInfo;

@BeforeEach
void onBefore() {
indexShardWriterCache.shutdown();
}

@Test
void testSingle() throws IOException {
void testSingle() {
assertThat(indexShardService.find(FindIndexShardCriteria.matchAll()).size()).isZero();

// Do some work.
Expand All @@ -82,10 +99,11 @@ void testSingle() throws IOException {
// Create an index
final DocRef indexRef1 = commonTestScenarioCreator.createIndex("TEST_2010a");
final LuceneIndexDoc index1 = indexStore.readDocument(indexRef1);
final IndexShardKey indexShardKey1 = IndexShardKeyUtil.createTestKey(index1);
final IndexShardKey indexShardKey1 = IndexShardKey.createKey(index1);
final IndexShard indexShard = indexShardService.createIndexShard(indexShardKey1, nodeInfo.getThisNodeName());

// Create a writer in the pool
final IndexShardWriter writer1 = indexShardWriterCache.getWriterByShardKey(indexShardKey1);
final IndexShardWriter writer1 = indexShardWriterCache.getWriter(indexShard.getId());

// Assert that there is 1 writer in the pool.
assertThat(indexShardService.find(FindIndexShardCriteria.matchAll()).size()).isEqualTo(1);
Expand Down Expand Up @@ -119,7 +137,7 @@ void testSingle() throws IOException {
}

@Test
void testSimple() throws IOException {
void testSimple() {
assertThat(indexShardService.find(FindIndexShardCriteria.matchAll()).size()).isZero();

// Do some work.
Expand All @@ -128,15 +146,17 @@ void testSimple() throws IOException {

final DocRef indexRef1 = commonTestScenarioCreator.createIndex("TEST_2010");
final LuceneIndexDoc index1 = indexStore.readDocument(indexRef1);
final IndexShardKey indexShardKey1 = IndexShardKeyUtil.createTestKey(index1);
final IndexShardKey indexShardKey1 = IndexShardKey.createKey(index1);
final IndexShard indexShard1 = indexShardService.createIndexShard(indexShardKey1, nodeInfo.getThisNodeName());

final DocRef indexRef2 = commonTestScenarioCreator.createIndex("TEST_2011");
final LuceneIndexDoc index2 = indexStore.readDocument(indexRef2);
final IndexShardKey indexShardKey2 = IndexShardKeyUtil.createTestKey(index2);
final IndexShardKey indexShardKey2 = IndexShardKey.createKey(index2);
final IndexShard indexShard2 = indexShardService.createIndexShard(indexShardKey2, nodeInfo.getThisNodeName());

// Create 2 writers in the pool.
final IndexShardWriter writer1 = indexShardWriterCache.getWriterByShardKey(indexShardKey1);
final IndexShardWriter writer2 = indexShardWriterCache.getWriterByShardKey(indexShardKey2);
final IndexShardWriter writer1 = indexShardWriterCache.getWriter(indexShard1.getId());
final IndexShardWriter writer2 = indexShardWriterCache.getWriter(indexShard2.getId());

// Assert that there are 2 writers in the pool.
assertThat(indexShardService.find(FindIndexShardCriteria.matchAll()).size()).isEqualTo(2);
Expand Down Expand Up @@ -188,22 +208,8 @@ void testSimple() throws IOException {
assertThat(compareStatus(IndexShardStatus.OPEN, writer2.getIndexShardId())).isFalse();
}

private void checkDocCount(final int expected, final IndexShardWriter indexShardWriter) {
assertThat(indexShardWriter.getDocumentCount()).isEqualTo(expected);
}

private void checkDocCount(final int expected, final long indexShardId) {
final IndexShard loaded = indexShardService.loadById(indexShardId);
assertThat(loaded.getDocumentCount()).isEqualTo(expected);
}

private boolean compareStatus(final IndexShardStatus expected, final long indexShardId) {
final IndexShard loaded = indexShardService.loadById(indexShardId);
return expected.equals(loaded.getStatus());
}

@Test
void testSimpleRoll() throws IOException {
void testSimpleRoll() {
// Do some work.
final IndexDocument document = new IndexDocument();
document.add(new FieldValue(LuceneIndexField.createField("test"), ValString.create("test")));
Expand All @@ -212,15 +218,17 @@ void testSimpleRoll() throws IOException {
commonTestScenarioCreator.createIndexFields(),
10);
final LuceneIndexDoc index1 = indexStore.readDocument(indexRef1);
final IndexShardKey indexShardKey1 = IndexShardKeyUtil.createTestKey(index1);

final IndexShardWriter writer1 = indexShardWriterCache.getWriterByShardKey(indexShardKey1);
final IndexShardKey indexShardKey1 = IndexShardKey.createKey(index1);

for (int i = 0; i < 10; i++) {
assertThat(indexShardWriterCache.getWriterByShardKey(indexShardKey1)).isEqualTo(writer1);
indexer.addDocument(indexShardKey1, document);
}

final ResultPage<IndexShard> indexShardResultPage = indexShardService.find(FindIndexShardCriteria.matchAll());
assertThat(indexShardResultPage.size()).isOne();
final IndexShard indexShard1 = indexShardResultPage.getFirst();
final IndexShardWriter writer1 = indexShardWriterCache.getWriter(indexShard1.getId());

// Make sure the writer is full.
assertThatThrownBy(() -> writer1.addDocument(document)).isInstanceOf(IndexException.class);

Expand All @@ -231,13 +239,13 @@ void testSimpleRoll() throws IOException {
indexer.addDocument(indexShardKey1, document);

// Get the new writer.
final IndexShardWriter writer2 = indexShardWriterCache.getWriterByShardKey(indexShardKey1);
final IndexShardWriter writer2 = indexShardWriterCache.getWriter(indexShard1.getId());

// Make sure the writers are not the same.
assertThat(writer2).isNotEqualTo(writer1);

for (int i = 1; i < 10; i++) {
assertThat(indexShardWriterCache.getWriterByShardKey(indexShardKey1)).isEqualTo(writer2);
assertThat(indexShardWriterCache.getWriter(indexShard1.getId())).isEqualTo(writer2);
indexer.addDocument(indexShardKey1, document);
}

Expand All @@ -247,4 +255,132 @@ void testSimpleRoll() throws IOException {
// Make sure the writer is still open.
assertThat(compareStatus(IndexShardStatus.OPEN, writer2.getIndexShardId())).isTrue();
}

@Test
void testFileSystemError() throws IOException {
assertThat(indexShardService.find(FindIndexShardCriteria.matchAll()).size()).isZero();

// Make index volume paths that we cannot write to.
final Path tempDir = Files.createTempDirectory("stroom");
Files.setPosixFilePermissions(tempDir, Set.of(PosixFilePermission.OWNER_READ));
final ResultPage<IndexVolume> resultPage = indexVolumeDao.find(new ExpressionCriteria());
resultPage.forEach(indexVolume -> {
indexVolume.setPath(tempDir + indexVolume.getPath());
indexVolumeDao.update(indexVolume);
});

// Do some work.
final IndexDocument document = new IndexDocument();
document.add(new FieldValue(LuceneIndexField.createField("test"), ValString.create("test")));

final DocRef indexRef1 = commonTestScenarioCreator.createIndex("TEST_2010",
commonTestScenarioCreator.createIndexFields(),
10);
final LuceneIndexDoc index1 = indexStore.readDocument(indexRef1);
final IndexShardKey indexShardKey1 = IndexShardKey.createKey(index1);

assertThatThrownBy(() -> indexer.addDocument(indexShardKey1, document))
.isInstanceOf(RuntimeException.class)
.hasMessageContaining("Index volume path not found");
}

@Test
void testChangeFieldType() {
assertThat(indexShardService.find(FindIndexShardCriteria.matchAll()).size()).isZero();

final DocRef indexRef1 = commonTestScenarioCreator.createIndex("TEST_2010",
commonTestScenarioCreator.createIndexFields(),
10);
final LuceneIndexDoc index1 = indexStore.readDocument(indexRef1);
final IndexShardKey indexShardKey1 = IndexShardKey.createKey(index1);

final Selection<IndexShardStatus> statusSelection = Selection.selectNone();
statusSelection.addAll(Set.of(IndexShardStatus.CLOSED, IndexShardStatus.OPEN));
final FindIndexShardCriteria criteria = FindIndexShardCriteria
.builder()
.indexShardStatusSet(statusSelection)
.build();

final IndexDocument document1 = new IndexDocument();
document1.add(new FieldValue(LuceneIndexField
.builder()
.fldName("SourcePort")
.fldType(FieldType.TEXT)
.analyzerType(AnalyzerType.ALPHA_NUMERIC)
.termPositions(false)
.indexed(true)
.stored(false)
.build(), ValString.create("12345")));
indexer.addDocument(indexShardKey1, document1);
indexer.addDocument(indexShardKey1, document1);
assertThat(indexShardService.find(criteria).size()).isOne();

final IndexDocument document2 = new IndexDocument();
document2.add(new FieldValue(LuceneIndexField
.builder()
.fldName("SourcePort")
.fldType(FieldType.INTEGER)
.analyzerType(AnalyzerType.KEYWORD)
.termPositions(false)
.indexed(true)
.stored(false)
.build(), ValInteger.create(12345)));
indexer.addDocument(indexShardKey1, document2);
indexer.addDocument(indexShardKey1, document2);
assertThat(indexShardService.find(criteria).size()).isEqualTo(2);

final IndexDocument document3 = new IndexDocument();
document3.add(new FieldValue(LuceneIndexField
.builder()
.fldName("SourcePort")
.fldType(FieldType.TEXT)
.analyzerType(AnalyzerType.KEYWORD)
.termPositions(true)
.indexed(true)
.stored(false)
.build(), ValString.create("12345")));
indexer.addDocument(indexShardKey1, document3);
indexer.addDocument(indexShardKey1, document3);
assertThat(indexShardService.find(criteria).size()).isEqualTo(3);
}

@Disabled
@Test
void testPerformance() {
assertThat(indexShardService.find(FindIndexShardCriteria.matchAll()).size()).isZero();

final DocRef indexRef1 = commonTestScenarioCreator.createIndex("TEST_2010",
commonTestScenarioCreator.createIndexFields(),
1000000);
final LuceneIndexDoc index1 = indexStore.readDocument(indexRef1);
final IndexShardKey indexShardKey1 = IndexShardKey.createKey(index1);

final IndexDocument document1 = new IndexDocument();
document1.add(new FieldValue(LuceneIndexField
.builder()
.fldName("SourcePort")
.fldType(FieldType.TEXT)
.analyzerType(AnalyzerType.ALPHA_NUMERIC)
.termPositions(false)
.indexed(true)
.stored(false)
.build(), ValString.create("12345")));
for (int i = 0; i < 1000000; i++) {
indexer.addDocument(indexShardKey1, document1);
}
}

private void checkDocCount(final int expected, final IndexShardWriter indexShardWriter) {
assertThat(indexShardWriter.getDocumentCount()).isEqualTo(expected);
}

private void checkDocCount(final int expected, final long indexShardId) {
final IndexShard loaded = indexShardService.loadById(indexShardId);
assertThat(loaded.getDocumentCount()).isEqualTo(expected);
}

private boolean compareStatus(final IndexShardStatus expected, final long indexShardId) {
final IndexShard loaded = indexShardService.loadById(indexShardId);
return expected.equals(loaded.getStatus());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@
import stroom.docref.DocRef;
import stroom.index.impl.IndexDocument;
import stroom.index.impl.IndexFields;
import stroom.index.impl.IndexShardKeyUtil;
import stroom.index.impl.IndexShardWriter;
import stroom.index.impl.IndexStore;
import stroom.index.mock.MockIndexShardWriter;
import stroom.index.mock.MockIndexShardWriterCache;
import stroom.index.shared.IndexShardKey;
import stroom.index.shared.LuceneIndexDoc;
import stroom.index.shared.LuceneIndexField;
import stroom.pipeline.PipelineStore;
Expand Down Expand Up @@ -248,10 +246,8 @@ private List<IndexDocument> doTest(final String resourceName, final List<LuceneI
}

// Wrote anything ?
final IndexShardKey indexShardKey = IndexShardKeyUtil.createTestKey(index);
if (indexShardWriterCache.getWriters().size() > 0) {
if (!indexShardWriterCache.getWriters().isEmpty()) {
assertThat(indexShardWriterCache.getWriters().size()).isEqualTo(1);
assertThat(indexShardWriterCache.getWriters().containsKey(indexShardKey)).isTrue();

// Get a writer from the pool.
for (final IndexShardWriter writer : indexShardWriterCache.getWriters().values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import stroom.index.impl.IndexStore;
import stroom.index.mock.MockIndexShardWriter;
import stroom.index.mock.MockIndexShardWriterCache;
import stroom.index.shared.IndexShardKey;
import stroom.index.shared.LuceneIndexDoc;
import stroom.index.shared.LuceneIndexField;
import stroom.meta.shared.Meta;
Expand Down Expand Up @@ -152,7 +151,7 @@ void testSimple() {
assertThat(indexShardWriterCache.getWriters().size()).isEqualTo(1);

// Get the writer from the pool.
final Map<IndexShardKey, IndexShardWriter> writers = indexShardWriterCache.getWriters();
final Map<Long, IndexShardWriter> writers = indexShardWriterCache.getWriters();
final MockIndexShardWriter writer = (MockIndexShardWriter) writers.values().iterator().next();

// Check that we indexed 4 documents.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class TestBasicSearch extends AbstractCoreIntegrationTest {
// final String indexName = "TEST";
// final DocRef indexRef = commonTestScenarioCreator.createIndex(indexName, indexFields);
// final IndexDoc index = indexStore.readDocument(indexRef);
// final IndexShardKey indexShardKey = IndexShardKeyUtil.createTestKey(index);
// final IndexShardKey indexShardKey = IndexShardKey.createTestKey(index);
//
// // Do some work.
// for (int i = 1; i <= indexTestSize; i++) {
Expand Down
Loading

0 comments on commit 947389e

Please sign in to comment.