Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make multiple settings dynamic for Merge on Flush #17495

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix exists queries on nested flat_object fields throws exception ([#16803](https://github.com/opensearch-project/OpenSearch/pull/16803))
- Add highlighting for wildcard search on `match_only_text` field ([#17101](https://github.com/opensearch-project/OpenSearch/pull/17101))
- Fix illegal argument exception when creating a PIT ([#16781](https://github.com/opensearch-project/OpenSearch/pull/16781))
- Make `index.merge_on_flush.enabled`, `index.merge_on_flush.max_full_flush_merge_wait_time`, `index.merge_on_flush.policy`, `index.check_pending_flush.enabled` dynamic ([#17495](https://github.com/opensearch-project/OpenSearch/pull/17495))

### Security

Expand Down
10 changes: 8 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,8 @@ public static IndexMergePolicy fromString(String text) {
public static final Setting<Boolean> INDEX_CHECK_PENDING_FLUSH_ENABLED = Setting.boolSetting(
"index.check_pending_flush.enabled",
true,
Property.IndexScope
Property.IndexScope,
Property.Dynamic
);

public static final Setting<String> TIME_SERIES_INDEX_MERGE_POLICY = Setting.simpleString(
Expand Down Expand Up @@ -901,7 +902,7 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) {
/**
* Is flush check by write threads enabled or not
*/
private final boolean checkPendingFlushEnabled;
private volatile boolean checkPendingFlushEnabled;
/**
* Is fuzzy set enabled for doc id
*/
Expand Down Expand Up @@ -1200,6 +1201,11 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING,
this::setRemoteStoreTranslogRepository
);
scopedSettings.addSettingsUpdateConsumer(INDEX_CHECK_PENDING_FLUSH_ENABLED, this::setCheckPendingFlushEnabled);
}

public void setCheckPendingFlushEnabled(boolean checkPendingFlushEnabled) {
this.checkPendingFlushEnabled = checkPendingFlushEnabled;
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@
private volatile long lastDeleteVersionPruneTimeMSec;

protected final TranslogManager translogManager;
private final IndexWriterConfig indexWriterConfig;
protected final IndexWriter indexWriter;
protected final LocalCheckpointTracker localCheckpointTracker;
protected final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
Expand All @@ -166,6 +167,7 @@
protected final String historyUUID;

private final OpenSearchConcurrentMergeScheduler mergeScheduler;
private MergePolicy unMergeOnFlushPolicy;
private final ExternalReaderManager externalReaderManager;
private final OpenSearchReaderManager internalReaderManager;

Expand Down Expand Up @@ -292,7 +294,8 @@
translogManager::getLastSyncedGlobalCheckpoint
);
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
writer = createWriter();
this.indexWriterConfig = getIndexWriterConfig();
writer = createWriter(indexWriterConfig);
bootstrapAppendOnlyInfoFromWriter(writer);
final Map<String, String> commitData = commitDataAsMap(writer);
historyUUID = loadHistoryUUID(commitData);
Expand Down Expand Up @@ -2303,9 +2306,8 @@
}
}

private IndexWriter createWriter() throws IOException {
private IndexWriter createWriter(IndexWriterConfig iwc) throws IOException {
try {
final IndexWriterConfig iwc = getIndexWriterConfig();
return createWriter(store.directory(), iwc);
} catch (LockObtainFailedException ex) {
logger.warn("could not lock IndexWriter", ex);
Expand Down Expand Up @@ -2336,26 +2338,27 @@
iwc.setMergeScheduler(mergeScheduler);
// Give us the opportunity to upgrade old segments while performing
// background merges
MergePolicy mergePolicy = config().getMergePolicy();
this.unMergeOnFlushPolicy = config().getMergePolicy();
// always configure soft-deletes field so an engine with soft-deletes disabled can open a Lucene index with soft-deletes.
iwc.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD);
mergePolicy = new RecoverySourcePruneMergePolicy(
unMergeOnFlushPolicy = new RecoverySourcePruneMergePolicy(
SourceFieldMapper.RECOVERY_SOURCE_NAME,
softDeletesPolicy::getRetentionQuery,
new SoftDeletesRetentionMergePolicy(
Lucene.SOFT_DELETES_FIELD,
softDeletesPolicy::getRetentionQuery,
new PrunePostingsMergePolicy(mergePolicy, IdFieldMapper.NAME)
new PrunePostingsMergePolicy(unMergeOnFlushPolicy, IdFieldMapper.NAME)
)
);
boolean shuffleForcedMerge = Booleans.parseBoolean(System.getProperty("opensearch.shuffle_forced_merge", Boolean.TRUE.toString()));
if (shuffleForcedMerge) {
// We wrap the merge policy for all indices even though it is mostly useful for time-based indices
// but there should be no overhead for other type of indices so it's simpler than adding a setting
// to enable it.
mergePolicy = new ShuffleForcedMergePolicy(mergePolicy);
unMergeOnFlushPolicy = new ShuffleForcedMergePolicy(unMergeOnFlushPolicy);
}

MergePolicy mergePolicy = unMergeOnFlushPolicy;
if (config().getIndexSettings().isMergeOnFlushEnabled()) {
final long maxFullFlushMergeWaitMillis = config().getIndexSettings().getMaxFullFlushMergeWaitTime().millis();
if (maxFullFlushMergeWaitMillis > 0) {
Expand Down Expand Up @@ -2604,6 +2607,29 @@
// the setting will be re-interpreted if it's set to true
updateAutoIdTimestamp(Long.MAX_VALUE, true);
}
IndexSettings indexSettings = engineConfig.getIndexSettings();
if (indexSettings.isCheckPendingFlushEnabled() != indexWriterConfig.isCheckPendingFlushOnUpdate()) {
indexWriterConfig.setCheckPendingFlushUpdate(indexSettings.isCheckPendingFlushEnabled());
}
if (indexSettings.isMergeOnFlushEnabled()) {
final long maxFullFlushMergeWaitMillis = indexSettings.getMaxFullFlushMergeWaitTime().millis();
if (maxFullFlushMergeWaitMillis > 0) {
indexWriterConfig.setMaxFullFlushMergeWaitMillis(maxFullFlushMergeWaitMillis);
final Optional<UnaryOperator<MergePolicy>> mergeOnFlushPolicy = indexSettings.getMergeOnFlushPolicy();
if (mergeOnFlushPolicy.isPresent()) {
indexWriterConfig.setMergePolicy(new OpenSearchMergePolicy(mergeOnFlushPolicy.get().apply(unMergeOnFlushPolicy)));
}
} else {
logger.warn(

Check warning on line 2623 in server/src/main/java/org/opensearch/index/engine/InternalEngine.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/engine/InternalEngine.java#L2623

Added line #L2623 was not covered by tests
"The {} is enabled but {} is set to 0, merge on flush will not be activated",
IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(),
IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey()

Check warning on line 2626 in server/src/main/java/org/opensearch/index/engine/InternalEngine.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/engine/InternalEngine.java#L2625-L2626

Added lines #L2625 - L2626 were not covered by tests
);
}
} else {
indexWriterConfig.setMaxFullFlushMergeWaitMillis(0);
indexWriterConfig.setMergePolicy(new OpenSearchMergePolicy(unMergeOnFlushPolicy));
}
final TranslogDeletionPolicy translogDeletionPolicy = translogManager.getDeletionPolicy();
translogDeletionPolicy.setRetentionAgeInMillis(translogRetentionAge.millis());
translogDeletionPolicy.setRetentionSizeInBytes(translogRetentionSize.getBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.sandbox.index.MergeOnFlushMergePolicy;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.ReferenceManager;
Expand Down Expand Up @@ -142,6 +143,7 @@
import org.opensearch.index.seqno.RetentionLeases;
import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.OpenSearchMergePolicy;
import org.opensearch.index.shard.ShardUtils;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.DefaultTranslogDeletionPolicy;
Expand Down Expand Up @@ -6604,6 +6606,61 @@ public void testStressShouldPeriodicallyFlush() throws Exception {
}
}

public void testMultiSettingsDynamicForMerge() {
boolean checkPendingFlushEnabled = true;
boolean mergeOnFlushEnabled = true;
TimeValue maxFullFlushMergeWaitTime = TimeValue.timeValueSeconds(1);

final IndexSettings indexSettings = engine.config().getIndexSettings();
IndexMetadata indexMetadata = IndexMetadata.builder(indexSettings.getIndexMetadata())
.settings(
Settings.builder()
.put(indexSettings.getSettings())
.put(IndexSettings.INDEX_CHECK_PENDING_FLUSH_ENABLED.getKey(), checkPendingFlushEnabled)
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), mergeOnFlushEnabled)
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey(), maxFullFlushMergeWaitTime)
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY.getKey(), "merge-on-flush")
)
.build();
indexSettings.updateIndexMetadata(indexMetadata);
engine.onSettingsChanged(
indexSettings.getTranslogRetentionAge(),
indexSettings.getTranslogRetentionSize(),
indexSettings.getSoftDeleteRetentionOperations()
);

assertEquals(checkPendingFlushEnabled, engine.getCurrentIndexWriterConfig().isCheckPendingFlushOnUpdate());
assertEquals(maxFullFlushMergeWaitTime.millis(), engine.getCurrentIndexWriterConfig().getMaxFullFlushMergeWaitMillis());
MergePolicy mergePolicy = engine.getCurrentIndexWriterConfig().getMergePolicy();
assertTrue(mergePolicy instanceof OpenSearchMergePolicy);
assertTrue(((OpenSearchMergePolicy) mergePolicy).getDelegate() instanceof MergeOnFlushMergePolicy);

mergeOnFlushEnabled = false;
checkPendingFlushEnabled = false;
indexMetadata = IndexMetadata.builder(indexSettings.getIndexMetadata())
.settings(
Settings.builder()
.put(indexSettings.getSettings())
.put(IndexSettings.INDEX_CHECK_PENDING_FLUSH_ENABLED.getKey(), checkPendingFlushEnabled)
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), mergeOnFlushEnabled)
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey(), maxFullFlushMergeWaitTime)
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY.getKey(), "merge-on-flush")
)
.build();
indexSettings.updateIndexMetadata(indexMetadata);
engine.onSettingsChanged(
indexSettings.getTranslogRetentionAge(),
indexSettings.getTranslogRetentionSize(),
indexSettings.getSoftDeleteRetentionOperations()
);

assertEquals(checkPendingFlushEnabled, engine.getCurrentIndexWriterConfig().isCheckPendingFlushOnUpdate());
assertEquals(0, engine.getCurrentIndexWriterConfig().getMaxFullFlushMergeWaitMillis());
mergePolicy = engine.getCurrentIndexWriterConfig().getMergePolicy();
assertTrue(mergePolicy instanceof OpenSearchMergePolicy);
assertTrue(((OpenSearchMergePolicy) mergePolicy).getDelegate() instanceof ShuffleForcedMergePolicy);
}

public void testStressUpdateSameDocWhileGettingIt() throws IOException, InterruptedException {
final int iters = randomIntBetween(1, 15);
for (int i = 0; i < iters; i++) {
Expand Down
Loading