Skip to content

Commit

Permalink
make index.merge_on_flush.enabled, `index.merge_on_flush.max_full_f…
Browse files Browse the repository at this point in the history
…lush_merge_wait_time`, `index.merge_on_flush.policy`, `index.check_pending_flush.enabled` dynamic

Signed-off-by: kkewwei <[email protected]>
Signed-off-by: kkewwei <[email protected]>
  • Loading branch information
kkewwei committed Mar 3, 2025
1 parent 968eafb commit c8b84a8
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix exists queries on nested flat_object fields throws exception ([#16803](https://github.com/opensearch-project/OpenSearch/pull/16803))
- Add highlighting for wildcard search on `match_only_text` field ([#17101](https://github.com/opensearch-project/OpenSearch/pull/17101))
- Fix illegal argument exception when creating a PIT ([#16781](https://github.com/opensearch-project/OpenSearch/pull/16781))
- Make `index.merge_on_flush.enabled`, `index.merge_on_flush.max_full_flush_merge_wait_time`, `index.merge_on_flush.policy`, `index.check_pending_flush.enabled` dynamic ([#17495](https://github.com/opensearch-project/OpenSearch/pull/17495))

### Security

Expand Down
10 changes: 8 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,8 @@ public static IndexMergePolicy fromString(String text) {
public static final Setting<Boolean> INDEX_CHECK_PENDING_FLUSH_ENABLED = Setting.boolSetting(
"index.check_pending_flush.enabled",
true,
Property.IndexScope
Property.IndexScope,
Property.Dynamic
);

public static final Setting<String> TIME_SERIES_INDEX_MERGE_POLICY = Setting.simpleString(
Expand Down Expand Up @@ -901,7 +902,7 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) {
/**
* Is flush check by write threads enabled or not
*/
private final boolean checkPendingFlushEnabled;
private volatile boolean checkPendingFlushEnabled;
/**
* Is fuzzy set enabled for doc id
*/
Expand Down Expand Up @@ -1200,6 +1201,11 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING,
this::setRemoteStoreTranslogRepository
);
scopedSettings.addSettingsUpdateConsumer(INDEX_CHECK_PENDING_FLUSH_ENABLED, this::setCheckPendingFlushEnabled);
}

public void setCheckPendingFlushEnabled(boolean checkPendingFlushEnabled) {
this.checkPendingFlushEnabled = checkPendingFlushEnabled;
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ public class InternalEngine extends Engine {
private volatile long lastDeleteVersionPruneTimeMSec;

protected final TranslogManager translogManager;
private final IndexWriterConfig indexWriterConfig;
protected final IndexWriter indexWriter;
protected final LocalCheckpointTracker localCheckpointTracker;
protected final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
Expand All @@ -166,6 +167,7 @@ public class InternalEngine extends Engine {
protected final String historyUUID;

private final OpenSearchConcurrentMergeScheduler mergeScheduler;
private MergePolicy unMergeOnFlushPolicy;
private final ExternalReaderManager externalReaderManager;
private final OpenSearchReaderManager internalReaderManager;

Expand Down Expand Up @@ -292,7 +294,8 @@ public void onFailure(String reason, Exception ex) {
translogManager::getLastSyncedGlobalCheckpoint
);
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
writer = createWriter();
this.indexWriterConfig = getIndexWriterConfig();
writer = createWriter(indexWriterConfig);
bootstrapAppendOnlyInfoFromWriter(writer);
final Map<String, String> commitData = commitDataAsMap(writer);
historyUUID = loadHistoryUUID(commitData);
Expand Down Expand Up @@ -2303,9 +2306,8 @@ protected final ReferenceManager<OpenSearchDirectoryReader> getReferenceManager(
}
}

private IndexWriter createWriter() throws IOException {
private IndexWriter createWriter(IndexWriterConfig iwc) throws IOException {
try {
final IndexWriterConfig iwc = getIndexWriterConfig();
return createWriter(store.directory(), iwc);
} catch (LockObtainFailedException ex) {
logger.warn("could not lock IndexWriter", ex);
Expand Down Expand Up @@ -2336,26 +2338,27 @@ private IndexWriterConfig getIndexWriterConfig() {
iwc.setMergeScheduler(mergeScheduler);
// Give us the opportunity to upgrade old segments while performing
// background merges
MergePolicy mergePolicy = config().getMergePolicy();
this.unMergeOnFlushPolicy = config().getMergePolicy();
// always configure soft-deletes field so an engine with soft-deletes disabled can open a Lucene index with soft-deletes.
iwc.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD);
mergePolicy = new RecoverySourcePruneMergePolicy(
unMergeOnFlushPolicy = new RecoverySourcePruneMergePolicy(
SourceFieldMapper.RECOVERY_SOURCE_NAME,
softDeletesPolicy::getRetentionQuery,
new SoftDeletesRetentionMergePolicy(
Lucene.SOFT_DELETES_FIELD,
softDeletesPolicy::getRetentionQuery,
new PrunePostingsMergePolicy(mergePolicy, IdFieldMapper.NAME)
new PrunePostingsMergePolicy(unMergeOnFlushPolicy, IdFieldMapper.NAME)
)
);
boolean shuffleForcedMerge = Booleans.parseBoolean(System.getProperty("opensearch.shuffle_forced_merge", Boolean.TRUE.toString()));
if (shuffleForcedMerge) {
// We wrap the merge policy for all indices even though it is mostly useful for time-based indices
// but there should be no overhead for other type of indices so it's simpler than adding a setting
// to enable it.
mergePolicy = new ShuffleForcedMergePolicy(mergePolicy);
unMergeOnFlushPolicy = new ShuffleForcedMergePolicy(unMergeOnFlushPolicy);
}

MergePolicy mergePolicy = unMergeOnFlushPolicy;
if (config().getIndexSettings().isMergeOnFlushEnabled()) {
final long maxFullFlushMergeWaitMillis = config().getIndexSettings().getMaxFullFlushMergeWaitTime().millis();
if (maxFullFlushMergeWaitMillis > 0) {
Expand Down Expand Up @@ -2604,6 +2607,29 @@ public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue tran
// the setting will be re-interpreted if it's set to true
updateAutoIdTimestamp(Long.MAX_VALUE, true);
}
IndexSettings indexSettings = engineConfig.getIndexSettings();
if (indexSettings.isCheckPendingFlushEnabled() != indexWriterConfig.isCheckPendingFlushOnUpdate()) {
indexWriterConfig.setCheckPendingFlushUpdate(indexSettings.isCheckPendingFlushEnabled());
}
if (indexSettings.isMergeOnFlushEnabled()) {
final long maxFullFlushMergeWaitMillis = indexSettings.getMaxFullFlushMergeWaitTime().millis();
if (maxFullFlushMergeWaitMillis > 0) {
indexWriterConfig.setMaxFullFlushMergeWaitMillis(maxFullFlushMergeWaitMillis);
final Optional<UnaryOperator<MergePolicy>> mergeOnFlushPolicy = indexSettings.getMergeOnFlushPolicy();
if (mergeOnFlushPolicy.isPresent()) {
indexWriterConfig.setMergePolicy(new OpenSearchMergePolicy(mergeOnFlushPolicy.get().apply(unMergeOnFlushPolicy)));
}
} else {
logger.warn(
"The {} is enabled but {} is set to 0, merge on flush will not be activated",
IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(),
IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey()
);
}
} else {
indexWriterConfig.setMaxFullFlushMergeWaitMillis(0);
indexWriterConfig.setMergePolicy(new OpenSearchMergePolicy(unMergeOnFlushPolicy));
}
final TranslogDeletionPolicy translogDeletionPolicy = translogManager.getDeletionPolicy();
translogDeletionPolicy.setRetentionAgeInMillis(translogRetentionAge.millis());
translogDeletionPolicy.setRetentionSizeInBytes(translogRetentionSize.getBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.sandbox.index.MergeOnFlushMergePolicy;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.ReferenceManager;
Expand Down Expand Up @@ -142,6 +143,7 @@
import org.opensearch.index.seqno.RetentionLeases;
import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.OpenSearchMergePolicy;
import org.opensearch.index.shard.ShardUtils;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.DefaultTranslogDeletionPolicy;
Expand Down Expand Up @@ -6604,6 +6606,58 @@ public void testStressShouldPeriodicallyFlush() throws Exception {
}
}

public void testMultiSettingsDynamicForMerge() {
boolean checkPendingFlushEnabled = true;
boolean mergeOnFlushEnabled = true;
TimeValue maxFullFlushMergeWaitTime = TimeValue.timeValueSeconds(1);

final IndexSettings indexSettings = engine.config().getIndexSettings();
IndexMetadata indexMetadata = IndexMetadata.builder(indexSettings.getIndexMetadata())
.settings(
Settings.builder()
.put(indexSettings.getSettings())
.put(IndexSettings.INDEX_CHECK_PENDING_FLUSH_ENABLED.getKey(), checkPendingFlushEnabled)
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), mergeOnFlushEnabled)
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey(), maxFullFlushMergeWaitTime)
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY.getKey(), "merge-on-flush")
)
.build();
indexSettings.updateIndexMetadata(indexMetadata);
engine.onSettingsChanged(
indexSettings.getTranslogRetentionAge(),
indexSettings.getTranslogRetentionSize(),
indexSettings.getSoftDeleteRetentionOperations()
);

assertEquals(checkPendingFlushEnabled, engine.getCurrentIndexWriterConfig().isCheckPendingFlushOnUpdate());
assertEquals(maxFullFlushMergeWaitTime.millis(), engine.getCurrentIndexWriterConfig().getMaxFullFlushMergeWaitMillis());
MergePolicy mergePolicy = engine.getCurrentIndexWriterConfig().getMergePolicy();
assertTrue(mergePolicy instanceof OpenSearchMergePolicy);
assertTrue(((OpenSearchMergePolicy) mergePolicy).getDelegate() instanceof MergeOnFlushMergePolicy);

mergeOnFlushEnabled = false;
indexMetadata = IndexMetadata.builder(indexSettings.getIndexMetadata())
.settings(
Settings.builder()
.put(indexSettings.getSettings())
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), mergeOnFlushEnabled)
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey(), maxFullFlushMergeWaitTime)
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY.getKey(), "merge-on-flush")
)
.build();
indexSettings.updateIndexMetadata(indexMetadata);
engine.onSettingsChanged(
indexSettings.getTranslogRetentionAge(),
indexSettings.getTranslogRetentionSize(),
indexSettings.getSoftDeleteRetentionOperations()
);

assertEquals(0, engine.getCurrentIndexWriterConfig().getMaxFullFlushMergeWaitMillis());
mergePolicy = engine.getCurrentIndexWriterConfig().getMergePolicy();
assertTrue(mergePolicy instanceof OpenSearchMergePolicy);
assertTrue(((OpenSearchMergePolicy) mergePolicy).getDelegate() instanceof ShuffleForcedMergePolicy);
}

public void testStressUpdateSameDocWhileGettingIt() throws IOException, InterruptedException {
final int iters = randomIntBetween(1, 15);
for (int i = 0; i < iters; i++) {
Expand Down

0 comments on commit c8b84a8

Please sign in to comment.