From bbb906fe6d7091a9adf2e2183a7f7aca52aac172 Mon Sep 17 00:00:00 2001 From: kkewwei Date: Mon, 3 Mar 2025 17:27:40 +0800 Subject: [PATCH] make `index.merge_on_flush.enabled`, `index.merge_on_flush.max_full_flush_merge_wait_time`, `index.merge_on_flush.policy`, `index.check_pending_flush.enabled` dynamic Signed-off-by: kkewwei Signed-off-by: kkewwei --- CHANGELOG.md | 1 + .../org/opensearch/index/IndexSettings.java | 10 +++- .../index/engine/InternalEngine.java | 40 +++++++++++--- .../index/engine/InternalEngineTests.java | 54 +++++++++++++++++++ 4 files changed, 96 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 45fd4813e72da..920bd72f42468 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix exists queries on nested flat_object fields throws exception ([#16803](https://github.com/opensearch-project/OpenSearch/pull/16803)) - Add highlighting for wildcard search on `match_only_text` field ([#17101](https://github.com/opensearch-project/OpenSearch/pull/17101)) - Fix illegal argument exception when creating a PIT ([#16781](https://github.com/opensearch-project/OpenSearch/pull/16781)) +- Make `index.merge_on_flush.enabled`, `index.merge_on_flush.max_full_flush_merge_wait_time`, `index.merge_on_flush.policy`, `index.check_pending_flush.enabled` dynamic ([#17495](https://github.com/opensearch-project/OpenSearch/pull/17495)) ### Security diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 554e99764c1a1..79fa1d07156d1 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -648,7 +648,8 @@ public static IndexMergePolicy fromString(String text) { public static final Setting INDEX_CHECK_PENDING_FLUSH_ENABLED = Setting.boolSetting( "index.check_pending_flush.enabled", true, - Property.IndexScope + Property.IndexScope, + Property.Dynamic ); public static final Setting TIME_SERIES_INDEX_MERGE_POLICY = Setting.simpleString( @@ -901,7 +902,7 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) { /** * Is flush check by write threads enabled or not */ - private final boolean checkPendingFlushEnabled; + private volatile boolean checkPendingFlushEnabled; /** * Is fuzzy set enabled for doc id */ @@ -1200,6 +1201,11 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING, this::setRemoteStoreTranslogRepository ); + scopedSettings.addSettingsUpdateConsumer(INDEX_CHECK_PENDING_FLUSH_ENABLED, this::setCheckPendingFlushEnabled); + } + + public void setCheckPendingFlushEnabled(boolean checkPendingFlushEnabled) { + this.checkPendingFlushEnabled = checkPendingFlushEnabled; } private void setSearchIdleAfter(TimeValue searchIdleAfter) { diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 064e757c6ebb7..eb916b82d0116 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -156,6 +156,7 @@ public class InternalEngine extends Engine { private volatile long lastDeleteVersionPruneTimeMSec; protected final TranslogManager translogManager; + private final IndexWriterConfig indexWriterConfig; protected final IndexWriter indexWriter; protected final LocalCheckpointTracker localCheckpointTracker; protected final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1); @@ -166,6 +167,7 @@ public class InternalEngine extends Engine { protected final String historyUUID; private final OpenSearchConcurrentMergeScheduler mergeScheduler; + private MergePolicy unMergeOnFlushPolicy; private final ExternalReaderManager externalReaderManager; private final OpenSearchReaderManager internalReaderManager; @@ -292,7 +294,8 @@ public void onFailure(String reason, Exception ex) { translogManager::getLastSyncedGlobalCheckpoint ); this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier); - writer = createWriter(); + this.indexWriterConfig = getIndexWriterConfig(); + writer = createWriter(indexWriterConfig); bootstrapAppendOnlyInfoFromWriter(writer); final Map commitData = commitDataAsMap(writer); historyUUID = loadHistoryUUID(commitData); @@ -2303,9 +2306,8 @@ protected final ReferenceManager getReferenceManager( } } - private IndexWriter createWriter() throws IOException { + private IndexWriter createWriter(IndexWriterConfig iwc) throws IOException { try { - final IndexWriterConfig iwc = getIndexWriterConfig(); return createWriter(store.directory(), iwc); } catch (LockObtainFailedException ex) { logger.warn("could not lock IndexWriter", ex); @@ -2336,16 +2338,16 @@ private IndexWriterConfig getIndexWriterConfig() { iwc.setMergeScheduler(mergeScheduler); // Give us the opportunity to upgrade old segments while performing // background merges - MergePolicy mergePolicy = config().getMergePolicy(); + this.unMergeOnFlushPolicy = config().getMergePolicy(); // always configure soft-deletes field so an engine with soft-deletes disabled can open a Lucene index with soft-deletes. iwc.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD); - mergePolicy = new RecoverySourcePruneMergePolicy( + unMergeOnFlushPolicy = new RecoverySourcePruneMergePolicy( SourceFieldMapper.RECOVERY_SOURCE_NAME, softDeletesPolicy::getRetentionQuery, new SoftDeletesRetentionMergePolicy( Lucene.SOFT_DELETES_FIELD, softDeletesPolicy::getRetentionQuery, - new PrunePostingsMergePolicy(mergePolicy, IdFieldMapper.NAME) + new PrunePostingsMergePolicy(unMergeOnFlushPolicy, IdFieldMapper.NAME) ) ); boolean shuffleForcedMerge = Booleans.parseBoolean(System.getProperty("opensearch.shuffle_forced_merge", Boolean.TRUE.toString())); @@ -2353,9 +2355,10 @@ private IndexWriterConfig getIndexWriterConfig() { // We wrap the merge policy for all indices even though it is mostly useful for time-based indices // but there should be no overhead for other type of indices so it's simpler than adding a setting // to enable it. - mergePolicy = new ShuffleForcedMergePolicy(mergePolicy); + unMergeOnFlushPolicy = new ShuffleForcedMergePolicy(unMergeOnFlushPolicy); } + MergePolicy mergePolicy = unMergeOnFlushPolicy; if (config().getIndexSettings().isMergeOnFlushEnabled()) { final long maxFullFlushMergeWaitMillis = config().getIndexSettings().getMaxFullFlushMergeWaitTime().millis(); if (maxFullFlushMergeWaitMillis > 0) { @@ -2604,6 +2607,29 @@ public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue tran // the setting will be re-interpreted if it's set to true updateAutoIdTimestamp(Long.MAX_VALUE, true); } + IndexSettings indexSettings = engineConfig.getIndexSettings(); + if (indexSettings.isCheckPendingFlushEnabled() != indexWriterConfig.isCheckPendingFlushOnUpdate()) { + indexWriterConfig.setCheckPendingFlushUpdate(indexSettings.isCheckPendingFlushEnabled()); + } + if (indexSettings.isMergeOnFlushEnabled()) { + final long maxFullFlushMergeWaitMillis = indexSettings.getMaxFullFlushMergeWaitTime().millis(); + if (maxFullFlushMergeWaitMillis > 0) { + indexWriterConfig.setMaxFullFlushMergeWaitMillis(maxFullFlushMergeWaitMillis); + final Optional> mergeOnFlushPolicy = indexSettings.getMergeOnFlushPolicy(); + if (mergeOnFlushPolicy.isPresent()) { + indexWriterConfig.setMergePolicy(new OpenSearchMergePolicy(mergeOnFlushPolicy.get().apply(unMergeOnFlushPolicy))); + } + } else { + logger.warn( + "The {} is enabled but {} is set to 0, merge on flush will not be activated", + IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), + IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey() + ); + } + } else { + indexWriterConfig.setMaxFullFlushMergeWaitMillis(0); + indexWriterConfig.setMergePolicy(new OpenSearchMergePolicy(unMergeOnFlushPolicy)); + } final TranslogDeletionPolicy translogDeletionPolicy = translogManager.getDeletionPolicy(); translogDeletionPolicy.setRetentionAgeInMillis(translogRetentionAge.millis()); translogDeletionPolicy.setRetentionSizeInBytes(translogRetentionSize.getBytes()); diff --git a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java index fa501b155e96b..b45ce5b4b6455 100644 --- a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java @@ -67,6 +67,7 @@ import org.apache.lucene.index.Terms; import org.apache.lucene.index.TermsEnum; import org.apache.lucene.index.TieredMergePolicy; +import org.apache.lucene.sandbox.index.MergeOnFlushMergePolicy; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.ReferenceManager; @@ -142,6 +143,7 @@ import org.opensearch.index.seqno.RetentionLeases; import org.opensearch.index.seqno.SeqNoStats; import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.index.shard.OpenSearchMergePolicy; import org.opensearch.index.shard.ShardUtils; import org.opensearch.index.store.Store; import org.opensearch.index.translog.DefaultTranslogDeletionPolicy; @@ -6604,6 +6606,58 @@ public void testStressShouldPeriodicallyFlush() throws Exception { } } + public void testT1() throws Exception { + boolean checkPendingFlushEnabled = true; + boolean mergeOnFlushEnabled = true; + TimeValue maxFullFlushMergeWaitTime = TimeValue.timeValueSeconds(1); + + final IndexSettings indexSettings = engine.config().getIndexSettings(); + IndexMetadata indexMetadata = IndexMetadata.builder(indexSettings.getIndexMetadata()) + .settings( + Settings.builder() + .put(indexSettings.getSettings()) + .put(IndexSettings.INDEX_CHECK_PENDING_FLUSH_ENABLED.getKey(), checkPendingFlushEnabled) + .put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), mergeOnFlushEnabled) + .put(IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey(), maxFullFlushMergeWaitTime) + .put(IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY.getKey(), "merge-on-flush") + ) + .build(); + indexSettings.updateIndexMetadata(indexMetadata); + engine.onSettingsChanged( + indexSettings.getTranslogRetentionAge(), + indexSettings.getTranslogRetentionSize(), + indexSettings.getSoftDeleteRetentionOperations() + ); + + assertEquals(checkPendingFlushEnabled, engine.getCurrentIndexWriterConfig().isCheckPendingFlushOnUpdate()); + assertEquals(maxFullFlushMergeWaitTime.millis(), engine.getCurrentIndexWriterConfig().getMaxFullFlushMergeWaitMillis()); + MergePolicy mergePolicy = engine.getCurrentIndexWriterConfig().getMergePolicy(); + assertTrue(mergePolicy instanceof OpenSearchMergePolicy); + assertTrue(((OpenSearchMergePolicy) mergePolicy).getDelegate() instanceof MergeOnFlushMergePolicy); + + mergeOnFlushEnabled = false; + indexMetadata = IndexMetadata.builder(indexSettings.getIndexMetadata()) + .settings( + Settings.builder() + .put(indexSettings.getSettings()) + .put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), mergeOnFlushEnabled) + .put(IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey(), maxFullFlushMergeWaitTime) + .put(IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY.getKey(), "merge-on-flush") + ) + .build(); + indexSettings.updateIndexMetadata(indexMetadata); + engine.onSettingsChanged( + indexSettings.getTranslogRetentionAge(), + indexSettings.getTranslogRetentionSize(), + indexSettings.getSoftDeleteRetentionOperations() + ); + + assertEquals(0, engine.getCurrentIndexWriterConfig().getMaxFullFlushMergeWaitMillis()); + mergePolicy = engine.getCurrentIndexWriterConfig().getMergePolicy(); + assertTrue(mergePolicy instanceof OpenSearchMergePolicy); + assertTrue(((OpenSearchMergePolicy) mergePolicy).getDelegate() instanceof ShuffleForcedMergePolicy); + } + public void testStressUpdateSameDocWhileGettingIt() throws IOException, InterruptedException { final int iters = randomIntBetween(1, 15); for (int i = 0; i < iters; i++) {