diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index d2cd5be4..c3f599a0 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -10,21 +10,26 @@ on: jobs: build: + strategy: + matrix: + java: + - 11 + - 17 # Job name name: Build Replication plugin runs-on: ubuntu-latest steps: # This step uses the setup-java Github action: https://github.com/actions/setup-java - - name: Set Up JDK 14 + - name: Set Up JDK ${{ matrix.java }} uses: actions/setup-java@v1 with: - java-version: 14 + java-version: ${{ matrix.java }} # This step uses the checkout Github action: https://github.com/actions/checkout - name: Checkout Branch uses: actions/checkout@v2 - name: Build and run Replication tests run: | - ./gradlew clean release -Dbuild.snapshot=true -Dopensearch.version=1.3.0-SNAPSHOT + ./gradlew clean release -Dbuild.snapshot=true - name: Upload failed logs uses: actions/upload-artifact@v2 if: failure() diff --git a/.github/workflows/bwc.yml b/.github/workflows/bwc.yml index f95a3785..547edea0 100644 --- a/.github/workflows/bwc.yml +++ b/.github/workflows/bwc.yml @@ -15,17 +15,17 @@ jobs: runs-on: ubuntu-latest steps: # This step uses the setup-java Github action: https://github.com/actions/setup-java - - name: Set Up JDK 14 + - name: Set Up JDK 11 uses: actions/setup-java@v1 with: - java-version: 14 + java-version: 11 # This step uses the checkout Github action: https://github.com/actions/checkout - name: Checkout Branch uses: actions/checkout@v2 - name: Build and run Replication tests run: | echo "Running backwards compatibility tests ..." - ./gradlew clean release -Dbuild.snapshot=true -Dopensearch.version=1.3.0-SNAPSHOT -x test -x IntegTest + ./gradlew clean release -Dbuild.snapshot=true -x test -x IntegTest ./gradlew mixedClusterTask --stacktrace ./gradlew fullRestartClusterTask --stacktrace - name: Upload failed logs diff --git a/.github/workflows/security-tests.yml b/.github/workflows/security-tests.yml index b96b7d4f..ee435d15 100644 --- a/.github/workflows/security-tests.yml +++ b/.github/workflows/security-tests.yml @@ -15,10 +15,10 @@ jobs: runs-on: ubuntu-latest steps: # This step uses the setup-java Github action: https://github.com/actions/setup-java - - name: Set Up JDK 14 + - name: Set Up JDK 11 uses: actions/setup-java@v1 with: - java-version: 14 + java-version: 11 # This step uses the checkout Github action: https://github.com/actions/checkout - name: Checkout Branch uses: actions/checkout@v2 @@ -33,11 +33,11 @@ jobs: working-directory: ./security run: | ./gradlew clean build -Dbuild.snapshot=false -x test - cp build/distributions/opensearch-security-1.3.0.0.zip ../src/test/resources/security/plugin/opensearch-security.zip + cp build/distributions/opensearch-security-*.zip ../src/test/resources/security/plugin/opensearch-security.zip - name: Build and run Replication tests run: | ls -al src/test/resources/security/plugin - ./gradlew clean release -Dbuild.snapshot=true -Dopensearch.version=1.3.0-SNAPSHOT -PnumNodes=1 -Psecurity=true + ./gradlew clean release -Dbuild.snapshot=true -PnumNodes=1 -Psecurity=true - name: Upload failed logs uses: actions/upload-artifact@v2 if: failure() @@ -54,4 +54,4 @@ jobs: - name: Uploads coverage with: fetch-depth: 2 - uses: codecov/codecov-action@v1.2.1 \ No newline at end of file + uses: codecov/codecov-action@v1.2.1 diff --git a/README.md b/README.md index 0c13ad82..ee66e368 100644 --- a/README.md +++ b/README.md @@ -45,7 +45,7 @@ The replication machinery is implemented as an OpenSearch plugin that exposes AP The project in this package uses the [Gradle](https://docs.gradle.org/current/userguide/userguide.html) build system. Gradle comes with excellent documentation that should be your first stop when trying to figure out how to operate or modify the build. ### Building from the command line -Set JAVA_HOME to JDK-14 or above +Set JAVA_HOME to JDK-11 or above 1. `./gradlew build` builds and tests project. 2. `./gradlew clean release` cleans previous builds, creates new build and tests project. @@ -163,4 +163,4 @@ This project is licensed under the Apache-2.0 License. ## Copyright -Copyright OpenSearch Contributors. See [NOTICE](NOTICE) for details. \ No newline at end of file +Copyright OpenSearch Contributors. See [NOTICE](NOTICE) for details. diff --git a/build.gradle b/build.gradle index 535cd084..09f4a29e 100644 --- a/build.gradle +++ b/build.gradle @@ -10,8 +10,11 @@ */ +import javax.management.ObjectName +import javax.management.remote.JMXConnector import javax.management.remote.JMXConnectorFactory import javax.management.remote.JMXServiceURL +import javax.management.MBeanServerInvocationHandler import javax.net.ssl.HostnameVerifier import javax.net.ssl.HttpsURLConnection import javax.net.ssl.SSLContext @@ -32,18 +35,27 @@ import org.opensearch.gradle.test.RestIntegTestTask buildscript { ext { - isSnapshot = "true" == System.getProperty("build.snapshot", "false") - opensearch_version = System.getProperty("opensearch.version", "1.3.0-SNAPSHOT") + isSnapshot = "true" == System.getProperty("build.snapshot", "true") + opensearch_version = System.getProperty("opensearch.version", "2.0.0-alpha1-SNAPSHOT") + buildVersionQualifier = System.getProperty("build.version_qualifier", "alpha1") // Taken from https://github.com/opensearch-project/alerting/blob/main/build.gradle#L33 // 1.0.0 -> 1.0.0.0, and 1.0.0-SNAPSHOT -> 1.0.0.0-SNAPSHOT - opensearch_build = opensearch_version.replaceAll(/(\.\d)([^\d]*)$/, '$1.0$2') + version_tokens = opensearch_version.tokenize('-') + opensearch_build = version_tokens[0] + '.0' + if (buildVersionQualifier) { + opensearch_build += "-${buildVersionQualifier}" + } + if (isSnapshot) { + opensearch_build += "-SNAPSHOT" + } + // for bwc tests - opensearch_previous_version = System.getProperty("bwc_older_version", "1.1.0") + opensearch_previous_version = System.getProperty("bwc_older_version", "1.3.1") plugin_previous_version = opensearch_previous_version.replaceAll(/(\.\d)([^\d]*)$/, '$1.0$2') common_utils_version = System.getProperty("common_utils.version", opensearch_build) - kotlin_version = System.getProperty("kotlin.version", "1.3.72") + kotlin_version = System.getProperty("kotlin.version", "1.6.0") } @@ -71,10 +83,7 @@ plugins { allprojects { group = "org.opensearch" - version = "${opensearch_version}" - "-SNAPSHOT" + ".0" - if (isSnapshot) { - version += "-SNAPSHOT" - } + version = "${opensearch_build}" } apply plugin: 'java' @@ -109,21 +118,21 @@ configurations.all { } dependencies { - compileOnly "org.opensearch:opensearch:${opensearch_version}" - compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8" - compile "org.jetbrains.kotlin:kotlin-stdlib-jdk7" - compile "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}" - compile "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}" - compile "org.jetbrains:annotations:13.0" - compile "com.github.seancfoley:ipaddress:5.3.3" - compile "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.5" - compile "org.opensearch:common-utils:${common_utils_version}" - - testCompile "org.opensearch.test:framework:${opensearch_version}" + runtimeOnly "org.opensearch:opensearch:${opensearch_version}" + implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8" + implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk7" + implementation "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}" + implementation "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}" + implementation "org.jetbrains:annotations:13.0" + implementation "com.github.seancfoley:ipaddress:5.3.3" + implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:${kotlin_version}" + implementation "org.opensearch:common-utils:${common_utils_version}" + + testImplementation "org.opensearch.test:framework:${opensearch_version}" testImplementation "org.assertj:assertj-core:3.17.2" testImplementation "org.opensearch.client:opensearch-rest-high-level-client:${opensearch_version}" - testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-test:1.3.5" - testCompile "org.jetbrains.kotlin:kotlin-test:${kotlin_version}" + testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-test:${kotlin_version}" + testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}" testImplementation "com.nhaarman.mockitokotlin2:mockito-kotlin:2.2.0" } @@ -375,6 +384,15 @@ def configureCluster(OpenSearchCluster cluster, Boolean securityEnabled) { cluster.waitForAllConditions() } +interface IProxy { + byte[] getExecutionData(boolean reset); + + void dump(boolean reset); + + void reset(); +} + + int startJmxPort=7777 int endJmxPort = startJmxPort integTest { @@ -437,13 +455,15 @@ integTest { doLast { for (int port=startJmxPort; port() { @Override @@ -561,17 +623,18 @@ clusters.each { name -> testClusters { "$name" { versions = [opensearch_previous_version, opensearch_version] - plugin(provider(new Callable() { - @Override - RegularFile call() throws Exception { - return new RegularFile() { - @Override - File getAsFile() { - return fileTree(repl_old_zip).getSingleFile() - } - } - } - })) + plugin(project.tasks.zipBwcPlugin.archiveFile) +// plugin(provider(new Callable() { +// @Override +// RegularFile call() throws Exception { +// return new RegularFile() { +// @Override +// File getAsFile() { +// return fileTree(repl_old_zip).getSingleFile() +// } +// } +// } +// })) if (securityEnabled) { plugin(provider(securityPluginOld)) diff --git a/docs/RFC.md b/docs/RFC.md index 4d65738c..23b69f22 100644 --- a/docs/RFC.md +++ b/docs/RFC.md @@ -58,7 +58,7 @@ The replication machinery is implemented as an OpenSearch plugin that exposes AP ### Details -The Start Replication API performs basic validations (such as existence checks on the remote cluster & index) and then spawns a persistent background task named `IndexReplicationTask` in the follower cluster to coordinate the replication process. This task does not replicate any data directly, but rather is responsible for initiating subtasks and monitoring the overall replication process. Hence, it can run on any node on the cluster (including master nodes) and we chose the node with the least number of tasks at the time. Each step in the workflow is checkpointed so that it can be resumed safely if interrupted. +The Start Replication API performs basic validations (such as existence checks on the remote cluster & index) and then spawns a persistent background task named `IndexReplicationTask` in the follower cluster to coordinate the replication process. This task does not replicate any data directly, but rather is responsible for initiating subtasks and monitoring the overall replication process. Hence, it can run on any node on the cluster (including Cluster manager nodes) and we chose the node with the least number of tasks at the time. Each step in the workflow is checkpointed so that it can be resumed safely if interrupted. ![Details](/docs/images/rfc1.png?raw=true "Details") diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 36533ab8..8622ab59 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -11,6 +11,6 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-6.6.1-all.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-7.3.3-all.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/release-notes/opensearch-cross-cluster-replication.release-notes-1.3.0.0.md b/release-notes/opensearch-cross-cluster-replication.release-notes-1.3.0.0.md new file mode 100644 index 00000000..953eda3b --- /dev/null +++ b/release-notes/opensearch-cross-cluster-replication.release-notes-1.3.0.0.md @@ -0,0 +1,11 @@ +## Version 1.3.0.0 Release Notes + +Compatible with OpenSearch 1.2.0 +### Enhancements +* Enhance Autofollow task to start replication jobs based on settings ([#307](https://github.com/opensearch-project/cross-cluster-replication/pull/321)) + +### Bug Fixes +* Bugfix: Stregthen validation checks for status API ([#317](https://github.com/opensearch-project/cross-cluster-replication/pull/317)) + +### Infrastructure +* [CI] Default CI Java Version to Java 11, run tests on 8, 11 and 17 ([#329](https://github.com/opensearch-project/cross-cluster-replication/pull/329)) diff --git a/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt b/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt index 3bc10a1b..3fa602b0 100644 --- a/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt +++ b/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt @@ -11,16 +11,12 @@ package org.opensearch.replication -import org.opensearch.replication.action.autofollow.AutoFollowMasterNodeAction -import org.opensearch.replication.action.autofollow.TransportAutoFollowMasterNodeAction -import org.opensearch.replication.action.autofollow.TransportUpdateAutoFollowPatternAction -import org.opensearch.replication.action.autofollow.UpdateAutoFollowPatternAction import org.opensearch.replication.action.changes.GetChangesAction import org.opensearch.replication.action.changes.TransportGetChangesAction import org.opensearch.replication.action.index.ReplicateIndexAction -import org.opensearch.replication.action.index.ReplicateIndexMasterNodeAction +import org.opensearch.replication.action.index.ReplicateIndexClusterManagerNodeAction import org.opensearch.replication.action.index.TransportReplicateIndexAction -import org.opensearch.replication.action.index.TransportReplicateIndexMasterNodeAction +import org.opensearch.replication.action.index.TransportReplicateIndexClusterManagerNodeAction import org.opensearch.replication.action.index.block.TransportUpddateIndexBlockAction import org.opensearch.replication.action.index.block.UpdateIndexBlockAction import org.opensearch.replication.action.pause.PauseIndexReplicationAction @@ -120,6 +116,7 @@ import org.opensearch.plugins.EnginePlugin import org.opensearch.plugins.PersistentTaskPlugin import org.opensearch.plugins.Plugin import org.opensearch.plugins.RepositoryPlugin +import org.opensearch.replication.action.autofollow.* import org.opensearch.replication.action.stats.AutoFollowStatsAction import org.opensearch.replication.action.stats.FollowerStatsAction import org.opensearch.replication.action.stats.LeaderStatsAction @@ -191,6 +188,8 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin, ByteSizeValue(512, ByteSizeUnit.MB), Setting.Property.Dynamic, Setting.Property.IndexScope) val REPLICATION_FOLLOWER_BLOCK_START: Setting = Setting.boolSetting("plugins.replication.follower.block.start", false, Setting.Property.Dynamic, Setting.Property.NodeScope) + val REPLICATION_AUTOFOLLOW_CONCURRENT_REPLICATION_JOBS_TRIGGER_SIZE: Setting = Setting.intSetting("plugins.replication.autofollow.concurrent_replication_jobs_trigger_size", 3, 1, 10, + Setting.Property.Dynamic, Setting.Property.NodeScope) } override fun createComponents(client: Client, clusterService: ClusterService, threadPool: ThreadPool, @@ -216,12 +215,12 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin, override fun getActions(): List> { return listOf(ActionHandler(GetChangesAction.INSTANCE, TransportGetChangesAction::class.java), ActionHandler(ReplicateIndexAction.INSTANCE, TransportReplicateIndexAction::class.java), - ActionHandler(ReplicateIndexMasterNodeAction.INSTANCE, TransportReplicateIndexMasterNodeAction::class.java), + ActionHandler(ReplicateIndexClusterManagerNodeAction.INSTANCE, TransportReplicateIndexClusterManagerNodeAction::class.java), ActionHandler(ReplayChangesAction.INSTANCE, TransportReplayChangesAction::class.java), ActionHandler(GetStoreMetadataAction.INSTANCE, TransportGetStoreMetadataAction::class.java), ActionHandler(GetFileChunkAction.INSTANCE, TransportGetFileChunkAction::class.java), ActionHandler(UpdateAutoFollowPatternAction.INSTANCE, TransportUpdateAutoFollowPatternAction::class.java), - ActionHandler(AutoFollowMasterNodeAction.INSTANCE, TransportAutoFollowMasterNodeAction::class.java), + ActionHandler(AutoFollowClusterManagerNodeAction.INSTANCE, TransportAutoFollowClusterManagerNodeAction::class.java), ActionHandler(StopIndexReplicationAction.INSTANCE, TransportStopIndexReplicationAction::class.java), ActionHandler(PauseIndexReplicationAction.INSTANCE, TransportPauseIndexReplicationAction::class.java), ActionHandler(ResumeIndexReplicationAction.INSTANCE, TransportResumeIndexReplicationAction::class.java), @@ -348,7 +347,7 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin, REPLICATION_PARALLEL_READ_POLL_INTERVAL, REPLICATION_AUTOFOLLOW_REMOTE_INDICES_POLL_INTERVAL, REPLICATION_AUTOFOLLOW_REMOTE_INDICES_RETRY_POLL_INTERVAL, REPLICATION_METADATA_SYNC_INTERVAL, REPLICATION_RETENTION_LEASE_MAX_FAILURE_DURATION, REPLICATION_INDEX_TRANSLOG_PRUNING_ENABLED_SETTING, - REPLICATION_INDEX_TRANSLOG_RETENTION_SIZE, REPLICATION_FOLLOWER_BLOCK_START) + REPLICATION_INDEX_TRANSLOG_RETENTION_SIZE, REPLICATION_FOLLOWER_BLOCK_START, REPLICATION_AUTOFOLLOW_CONCURRENT_REPLICATION_JOBS_TRIGGER_SIZE) } override fun getInternalRepositories(env: Environment, namedXContentRegistry: NamedXContentRegistry, diff --git a/src/main/kotlin/org/opensearch/replication/ReplicationSettings.kt b/src/main/kotlin/org/opensearch/replication/ReplicationSettings.kt index 2a29764e..a6d1bbd3 100644 --- a/src/main/kotlin/org/opensearch/replication/ReplicationSettings.kt +++ b/src/main/kotlin/org/opensearch/replication/ReplicationSettings.kt @@ -31,6 +31,7 @@ open class ReplicationSettings(clusterService: ClusterService) { @Volatile var metadataSyncInterval = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_METADATA_SYNC_INTERVAL) @Volatile var leaseRenewalMaxFailureDuration: TimeValue = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_RETENTION_LEASE_MAX_FAILURE_DURATION) @Volatile var followerBlockStart: Boolean = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_FOLLOWER_BLOCK_START) + @Volatile var autofollowConcurrentJobsTriggerSize: Int = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_AUTOFOLLOW_CONCURRENT_REPLICATION_JOBS_TRIGGER_SIZE) init { listenForUpdates(clusterService.clusterSettings) @@ -47,5 +48,6 @@ open class ReplicationSettings(clusterService: ClusterService) { clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_AUTOFOLLOW_REMOTE_INDICES_RETRY_POLL_INTERVAL) { autofollowRetryPollDuration = it } clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_METADATA_SYNC_INTERVAL) { metadataSyncInterval = it } clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_FOLLOWER_BLOCK_START) { followerBlockStart = it } + clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_AUTOFOLLOW_CONCURRENT_REPLICATION_JOBS_TRIGGER_SIZE) { autofollowConcurrentJobsTriggerSize = it } } } diff --git a/src/main/kotlin/org/opensearch/replication/action/autofollow/AutoFollowMasterNodeAction.kt b/src/main/kotlin/org/opensearch/replication/action/autofollow/AutoFollowClusterManagerNodeAction.kt similarity index 77% rename from src/main/kotlin/org/opensearch/replication/action/autofollow/AutoFollowMasterNodeAction.kt rename to src/main/kotlin/org/opensearch/replication/action/autofollow/AutoFollowClusterManagerNodeAction.kt index 181941ec..ccda3a15 100644 --- a/src/main/kotlin/org/opensearch/replication/action/autofollow/AutoFollowMasterNodeAction.kt +++ b/src/main/kotlin/org/opensearch/replication/action/autofollow/AutoFollowClusterManagerNodeAction.kt @@ -14,9 +14,9 @@ package org.opensearch.replication.action.autofollow import org.opensearch.action.ActionType import org.opensearch.action.support.master.AcknowledgedResponse -class AutoFollowMasterNodeAction: ActionType(NAME, ::AcknowledgedResponse) { +class AutoFollowClusterManagerNodeAction: ActionType(NAME, ::AcknowledgedResponse) { companion object { const val NAME = "internal:cluster:admin/plugins/replication/autofollow/update" - val INSTANCE = AutoFollowMasterNodeAction() + val INSTANCE = AutoFollowClusterManagerNodeAction() } } diff --git a/src/main/kotlin/org/opensearch/replication/action/autofollow/AutoFollowMasterNodeRequest.kt b/src/main/kotlin/org/opensearch/replication/action/autofollow/AutoFollowClusterManagerNodeRequest.kt similarity index 94% rename from src/main/kotlin/org/opensearch/replication/action/autofollow/AutoFollowMasterNodeRequest.kt rename to src/main/kotlin/org/opensearch/replication/action/autofollow/AutoFollowClusterManagerNodeRequest.kt index 57f2a8fc..80c05995 100644 --- a/src/main/kotlin/org/opensearch/replication/action/autofollow/AutoFollowMasterNodeRequest.kt +++ b/src/main/kotlin/org/opensearch/replication/action/autofollow/AutoFollowClusterManagerNodeRequest.kt @@ -20,7 +20,7 @@ import org.opensearch.common.xcontent.ToXContent import org.opensearch.common.xcontent.ToXContentObject import org.opensearch.common.xcontent.XContentBuilder -class AutoFollowMasterNodeRequest: MasterNodeRequest, ToXContentObject { +class AutoFollowClusterManagerNodeRequest: MasterNodeRequest, ToXContentObject { var user: User? = null var autofollowReq: UpdateAutoFollowPatternRequest var withSecurityContext: Boolean = false diff --git a/src/main/kotlin/org/opensearch/replication/action/autofollow/TransportAutoFollowMasterNodeAction.kt b/src/main/kotlin/org/opensearch/replication/action/autofollow/TransportAutoFollowClusterManagerNodeAction.kt similarity index 87% rename from src/main/kotlin/org/opensearch/replication/action/autofollow/TransportAutoFollowMasterNodeAction.kt rename to src/main/kotlin/org/opensearch/replication/action/autofollow/TransportAutoFollowClusterManagerNodeAction.kt index c3fc8554..e3c59444 100644 --- a/src/main/kotlin/org/opensearch/replication/action/autofollow/TransportAutoFollowMasterNodeAction.kt +++ b/src/main/kotlin/org/opensearch/replication/action/autofollow/TransportAutoFollowClusterManagerNodeAction.kt @@ -45,26 +45,26 @@ import org.opensearch.replication.ReplicationException import org.opensearch.threadpool.ThreadPool import org.opensearch.transport.TransportService -class TransportAutoFollowMasterNodeAction @Inject constructor(transportService: TransportService, clusterService: ClusterService, threadPool: ThreadPool, +class TransportAutoFollowClusterManagerNodeAction @Inject constructor(transportService: TransportService, clusterService: ClusterService, threadPool: ThreadPool, actionFilters: ActionFilters, indexNameExpressionResolver: IndexNameExpressionResolver, private val client: NodeClient, private val metadataManager: ReplicationMetadataManager, val indexScopedSettings: IndexScopedSettings) : - TransportMasterNodeAction( - AutoFollowMasterNodeAction.NAME, true, transportService, clusterService, threadPool, actionFilters, - ::AutoFollowMasterNodeRequest, indexNameExpressionResolver), CoroutineScope by GlobalScope { + TransportMasterNodeAction( + AutoFollowClusterManagerNodeAction.NAME, true, transportService, clusterService, threadPool, actionFilters, + ::AutoFollowClusterManagerNodeRequest, indexNameExpressionResolver), CoroutineScope by GlobalScope { companion object { - private val log = LogManager.getLogger(TransportAutoFollowMasterNodeAction::class.java) + private val log = LogManager.getLogger(TransportAutoFollowClusterManagerNodeAction::class.java) const val AUTOFOLLOW_EXCEPTION_GENERIC_STRING = "Failed to update autofollow pattern" } - override fun checkBlock(request: AutoFollowMasterNodeRequest, state: ClusterState): ClusterBlockException? { + override fun checkBlock(request: AutoFollowClusterManagerNodeRequest, state: ClusterState): ClusterBlockException? { return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE) } - override fun masterOperation(masterNodeReq: AutoFollowMasterNodeRequest, state: ClusterState, listener: ActionListener) { - val request = masterNodeReq.autofollowReq - var user = masterNodeReq.user + override fun masterOperation(clusterManagerNodeReq: AutoFollowClusterManagerNodeRequest, state: ClusterState, listener: ActionListener) { + val request = clusterManagerNodeReq.autofollowReq + var user = clusterManagerNodeReq.user launch(threadPool.coroutineContext()) { listener.completeWith { if (request.action == UpdateAutoFollowPatternRequest.Action.REMOVE) { diff --git a/src/main/kotlin/org/opensearch/replication/action/autofollow/TransportUpdateAutoFollowPatternAction.kt b/src/main/kotlin/org/opensearch/replication/action/autofollow/TransportUpdateAutoFollowPatternAction.kt index 9bc735be..4cb32b1a 100644 --- a/src/main/kotlin/org/opensearch/replication/action/autofollow/TransportUpdateAutoFollowPatternAction.kt +++ b/src/main/kotlin/org/opensearch/replication/action/autofollow/TransportUpdateAutoFollowPatternAction.kt @@ -63,8 +63,8 @@ class TransportUpdateAutoFollowPatternAction @Inject constructor(transportServic throw org.opensearch.replication.ReplicationException("Setup checks failed while setting-up auto follow pattern") } } - val masterNodeReq = AutoFollowMasterNodeRequest(user, request) - client.suspendExecute(AutoFollowMasterNodeAction.INSTANCE, masterNodeReq) + val clusterManagerNodeReq = AutoFollowClusterManagerNodeRequest(user, request) + client.suspendExecute(AutoFollowClusterManagerNodeAction.INSTANCE, clusterManagerNodeReq) } } } diff --git a/src/main/kotlin/org/opensearch/replication/action/changes/TransportGetChangesAction.kt b/src/main/kotlin/org/opensearch/replication/action/changes/TransportGetChangesAction.kt index d5f1e094..c483ebad 100644 --- a/src/main/kotlin/org/opensearch/replication/action/changes/TransportGetChangesAction.kt +++ b/src/main/kotlin/org/opensearch/replication/action/changes/TransportGetChangesAction.kt @@ -114,7 +114,7 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus if(!fetchFromTranslog) { log.debug("Fetching changes from lucene for ${request.shardId} - from:${request.fromSeqNo}, to:$toSeqNo") relativeStartNanos = System.nanoTime() - indexShard.newChangesSnapshot("odr", request.fromSeqNo, toSeqNo, true).use { snapshot -> + indexShard.newChangesSnapshot("odr", request.fromSeqNo, toSeqNo, true, true).use { snapshot -> ops = ArrayList(snapshot.totalOperations()) var op = snapshot.next() while (op != null) { diff --git a/src/main/kotlin/org/opensearch/replication/action/index/ReplicateIndexMasterNodeAction.kt b/src/main/kotlin/org/opensearch/replication/action/index/ReplicateIndexClusterManagerNodeAction.kt similarity index 70% rename from src/main/kotlin/org/opensearch/replication/action/index/ReplicateIndexMasterNodeAction.kt rename to src/main/kotlin/org/opensearch/replication/action/index/ReplicateIndexClusterManagerNodeAction.kt index 7fc14530..aebfb6cc 100644 --- a/src/main/kotlin/org/opensearch/replication/action/index/ReplicateIndexMasterNodeAction.kt +++ b/src/main/kotlin/org/opensearch/replication/action/index/ReplicateIndexClusterManagerNodeAction.kt @@ -14,9 +14,9 @@ package org.opensearch.replication.action.index import org.opensearch.action.ActionType import org.opensearch.action.support.master.AcknowledgedResponse -class ReplicateIndexMasterNodeAction private constructor(): ActionType(NAME, ::AcknowledgedResponse) { +class ReplicateIndexClusterManagerNodeAction private constructor(): ActionType(NAME, ::AcknowledgedResponse) { companion object { const val NAME = "internal:indices/admin/plugins/replication/index/start" - val INSTANCE: ReplicateIndexMasterNodeAction = ReplicateIndexMasterNodeAction() + val INSTANCE: ReplicateIndexClusterManagerNodeAction = ReplicateIndexClusterManagerNodeAction() } } diff --git a/src/main/kotlin/org/opensearch/replication/action/index/ReplicateIndexMasterNodeRequest.kt b/src/main/kotlin/org/opensearch/replication/action/index/ReplicateIndexClusterManagerNodeRequest.kt similarity index 94% rename from src/main/kotlin/org/opensearch/replication/action/index/ReplicateIndexMasterNodeRequest.kt rename to src/main/kotlin/org/opensearch/replication/action/index/ReplicateIndexClusterManagerNodeRequest.kt index b25a0d8e..2c06b6ca 100644 --- a/src/main/kotlin/org/opensearch/replication/action/index/ReplicateIndexMasterNodeRequest.kt +++ b/src/main/kotlin/org/opensearch/replication/action/index/ReplicateIndexClusterManagerNodeRequest.kt @@ -20,8 +20,8 @@ import org.opensearch.common.xcontent.ToXContent import org.opensearch.common.xcontent.ToXContentObject import org.opensearch.common.xcontent.XContentBuilder -class ReplicateIndexMasterNodeRequest: - MasterNodeRequest, ToXContentObject { +class ReplicateIndexClusterManagerNodeRequest: + MasterNodeRequest, ToXContentObject { var user: User? = null var replicateIndexReq: ReplicateIndexRequest diff --git a/src/main/kotlin/org/opensearch/replication/action/index/TransportReplicateIndexAction.kt b/src/main/kotlin/org/opensearch/replication/action/index/TransportReplicateIndexAction.kt index 11a60aa1..69d56bdf 100644 --- a/src/main/kotlin/org/opensearch/replication/action/index/TransportReplicateIndexAction.kt +++ b/src/main/kotlin/org/opensearch/replication/action/index/TransportReplicateIndexAction.kt @@ -105,8 +105,8 @@ class TransportReplicateIndexAction @Inject constructor(transportService: Transp // Setup checks are successful and trigger replication for the index // permissions evaluation to trigger replication is based on the current security context set - val internalReq = ReplicateIndexMasterNodeRequest(user, request) - client.suspendExecute(ReplicateIndexMasterNodeAction.INSTANCE, internalReq) + val internalReq = ReplicateIndexClusterManagerNodeRequest(user, request) + client.suspendExecute(ReplicateIndexClusterManagerNodeAction.INSTANCE, internalReq) ReplicateIndexResponse(true) } } diff --git a/src/main/kotlin/org/opensearch/replication/action/index/TransportReplicateIndexMasterNodeAction.kt b/src/main/kotlin/org/opensearch/replication/action/index/TransportReplicateIndexClusterManagerNodeAction.kt similarity index 87% rename from src/main/kotlin/org/opensearch/replication/action/index/TransportReplicateIndexMasterNodeAction.kt rename to src/main/kotlin/org/opensearch/replication/action/index/TransportReplicateIndexClusterManagerNodeAction.kt index 4d9fa108..1a926798 100644 --- a/src/main/kotlin/org/opensearch/replication/action/index/TransportReplicateIndexMasterNodeAction.kt +++ b/src/main/kotlin/org/opensearch/replication/action/index/TransportReplicateIndexClusterManagerNodeAction.kt @@ -52,22 +52,22 @@ import org.opensearch.threadpool.ThreadPool import org.opensearch.transport.TransportService import java.io.IOException -class TransportReplicateIndexMasterNodeAction @Inject constructor(transportService: TransportService, - clusterService: ClusterService, - threadPool: ThreadPool, - actionFilters: ActionFilters, - indexNameExpressionResolver: IndexNameExpressionResolver, - val indexScopedSettings: IndexScopedSettings, - private val persistentTasksService: PersistentTasksService, - private val nodeClient : NodeClient, - private val repositoryService: RepositoriesService, - private val replicationMetadataManager: ReplicationMetadataManager) : - TransportMasterNodeAction(ReplicateIndexMasterNodeAction.NAME, - transportService, clusterService, threadPool, actionFilters, ::ReplicateIndexMasterNodeRequest, indexNameExpressionResolver), +class TransportReplicateIndexClusterManagerNodeAction @Inject constructor(transportService: TransportService, + clusterService: ClusterService, + threadPool: ThreadPool, + actionFilters: ActionFilters, + indexNameExpressionResolver: IndexNameExpressionResolver, + val indexScopedSettings: IndexScopedSettings, + private val persistentTasksService: PersistentTasksService, + private val nodeClient : NodeClient, + private val repositoryService: RepositoriesService, + private val replicationMetadataManager: ReplicationMetadataManager) : + TransportMasterNodeAction(ReplicateIndexClusterManagerNodeAction.NAME, + transportService, clusterService, threadPool, actionFilters, ::ReplicateIndexClusterManagerNodeRequest, indexNameExpressionResolver), CoroutineScope by GlobalScope { companion object { - private val log = LogManager.getLogger(TransportReplicateIndexMasterNodeAction::class.java) + private val log = LogManager.getLogger(TransportReplicateIndexClusterManagerNodeAction::class.java) } override fun executor(): String { @@ -80,7 +80,7 @@ class TransportReplicateIndexMasterNodeAction @Inject constructor(transportServi } @Throws(Exception::class) - override fun masterOperation(request: ReplicateIndexMasterNodeRequest, state: ClusterState, + override fun masterOperation(request: ReplicateIndexClusterManagerNodeRequest, state: ClusterState, listener: ActionListener) { val replicateIndexReq = request.replicateIndexReq val user = request.user @@ -151,7 +151,7 @@ class TransportReplicateIndexMasterNodeAction @Inject constructor(transportServi return remoteState.metadata.index(leaderIndex) ?: throw IndexNotFoundException("${leaderAlias}:${leaderIndex}") } - override fun checkBlock(request: ReplicateIndexMasterNodeRequest, state: ClusterState): ClusterBlockException? { + override fun checkBlock(request: ReplicateIndexClusterManagerNodeRequest, state: ClusterState): ClusterBlockException? { return state.blocks.globalBlockedException(ClusterBlockLevel.METADATA_WRITE) } } diff --git a/src/main/kotlin/org/opensearch/replication/action/replay/TransportReplayChangesAction.kt b/src/main/kotlin/org/opensearch/replication/action/replay/TransportReplayChangesAction.kt index b609a3e5..8e80a06c 100644 --- a/src/main/kotlin/org/opensearch/replication/action/replay/TransportReplayChangesAction.kt +++ b/src/main/kotlin/org/opensearch/replication/action/replay/TransportReplayChangesAction.kt @@ -123,8 +123,7 @@ class TransportReplayChangesAction @Inject constructor(settings: Settings, trans if (result.resultType == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { waitForMappingUpdate { // fetch mappings from the leader cluster when applying on PRIMARY... - syncRemoteMapping(request.leaderAlias, request.leaderIndex, request.shardId()!!.indexName, - op.docType()) + syncRemoteMapping(request.leaderAlias, request.leaderIndex, request.shardId()!!.indexName) } result = primaryShard.applyTranslogOperation(op, Engine.Operation.Origin.PRIMARY) } @@ -156,26 +155,18 @@ class TransportReplayChangesAction @Inject constructor(settings: Settings, trans return WriteReplicaResult(request, location, null, replicaShard, log) } - private fun Translog.Operation.docType(): String { - return when (this) { - is Translog.Index -> type() - is Translog.Delete -> type() - else -> TODO("Operation ${opType()} not expected to have a document type") - } - } - /** - * Fetches the index mapping from the leader cluster, applies it to the local cluster's master and then waits + * Fetches the index mapping from the leader cluster, applies it to the local cluster's clusterManager and then waits * for the mapping to become available on the current shard. Should only be called on the primary shard . */ private suspend fun syncRemoteMapping(leaderAlias: String, leaderIndex: String, - followerIndex: String, type: String) { - log.debug("Syncing mappings from ${leaderAlias}:${leaderIndex}/${type} -> $followerIndex...") + followerIndex: String) { + log.debug("Syncing mappings from ${leaderAlias}:${leaderIndex} -> $followerIndex...") val remoteClient = client.getRemoteClusterClient(leaderAlias) val options = IndicesOptions.strictSingleIndexNoExpandForbidClosed() val getMappingsRequest = GetMappingsRequest().indices(leaderIndex).indicesOptions(options) val getMappingsResponse = remoteClient.suspending(remoteClient.admin().indices()::getMappings, injectSecurityContext = true)(getMappingsRequest) - val mappingSource = getMappingsResponse?.mappings()?.get(leaderIndex)?.get(type)?.source()?.string() + val mappingSource = getMappingsResponse?.mappings()?.get(leaderIndex)?.source()?.string() if (null == mappingSource) { log.error("Mapping response: $getMappingsResponse") throw MappingNotAvailableException("Mapping for the index $leaderIndex is not available") @@ -186,7 +177,7 @@ class TransportReplayChangesAction @Inject constructor(settings: Settings, trans // PutMappingRequest#setConcreteIndex has a bug where it throws an NPE.This is fixed upstream in // https://github.com/elastic/elasticsearch/pull/58419 and we should update to that when it is released. val putMappingRequest = PutMappingRequest().indices(followerIndex).indicesOptions(options) - .type(type).source(mappingSource, XContentType.JSON) + .source(mappingSource, XContentType.JSON) //TODO: call .masterNodeTimeout() with the setting indices.mapping.dynamic_timeout val updateMappingRequest = UpdateMetadataRequest(followerIndex, UpdateMetadataRequest.Type.MAPPING, putMappingRequest) client.suspendExecute(UpdateMetadataAction.INSTANCE, updateMappingRequest, injectSecurityContext = true) @@ -212,13 +203,13 @@ class TransportReplayChangesAction @Inject constructor(settings: Settings, trans return when (opType()!!) { Translog.Operation.Type.CREATE, Translog.Operation.Type.INDEX -> { val sourceOp = this as Translog.Index - Translog.Index(sourceOp.type(), sourceOp.id(), sourceOp.seqNo(), operationPrimaryTerm, + Translog.Index(sourceOp.id(), sourceOp.seqNo(), operationPrimaryTerm, sourceOp.version(), BytesReference.toBytes(sourceOp.source()), sourceOp.routing(), sourceOp.autoGeneratedIdTimestamp) } Translog.Operation.Type.DELETE -> { val sourceOp = this as Translog.Delete - Translog.Delete(sourceOp.type(), sourceOp.id(), sourceOp.uid(), sourceOp.seqNo(), operationPrimaryTerm, + Translog.Delete(sourceOp.id(), sourceOp.seqNo(), operationPrimaryTerm, sourceOp.version()) } Translog.Operation.Type.NO_OP -> { @@ -233,7 +224,7 @@ class TransportReplayChangesAction @Inject constructor(settings: Settings, trans // Unset auto gen timestamp as we use external Id from the leader index if (opType()!! == Translog.Operation.Type.CREATE || opType()!! == Translog.Operation.Type.INDEX ) { val sourceOp = this as Translog.Index - return Translog.Index(sourceOp.type(), sourceOp.id(), sourceOp.seqNo(), sourceOp.primaryTerm(), + return Translog.Index(sourceOp.id(), sourceOp.seqNo(), sourceOp.primaryTerm(), sourceOp.version(), BytesReference.toBytes(sourceOp.source()), sourceOp.routing(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP) } diff --git a/src/main/kotlin/org/opensearch/replication/action/status/ShardInfoRequest.kt b/src/main/kotlin/org/opensearch/replication/action/status/ShardInfoRequest.kt index 40c0d560..23549470 100644 --- a/src/main/kotlin/org/opensearch/replication/action/status/ShardInfoRequest.kt +++ b/src/main/kotlin/org/opensearch/replication/action/status/ShardInfoRequest.kt @@ -39,7 +39,11 @@ class ShardInfoRequest : BroadcastRequest , ToXContentObject { } override fun validate(): ActionRequestValidationException? { - return null + var validationException = ActionRequestValidationException() + if(indexName.isEmpty()) { + validationException.addValidationError("Index name must be specified to obtain replication status") + } + return if(validationException.validationErrors().isEmpty()) return null else validationException } override fun indices(): Array { diff --git a/src/main/kotlin/org/opensearch/replication/action/status/ShardInfoResponse.kt b/src/main/kotlin/org/opensearch/replication/action/status/ShardInfoResponse.kt index e4113a2a..4cadea82 100644 --- a/src/main/kotlin/org/opensearch/replication/action/status/ShardInfoResponse.kt +++ b/src/main/kotlin/org/opensearch/replication/action/status/ShardInfoResponse.kt @@ -11,7 +11,6 @@ package org.opensearch.replication.action.status -import org.apache.logging.log4j.LogManager import org.opensearch.action.support.broadcast.BroadcastResponse import org.opensearch.action.support.broadcast.BroadcastShardResponse import org.opensearch.common.ParseField @@ -29,11 +28,16 @@ class ShardInfoResponse : BroadcastShardResponse, ToXContentObject { lateinit var replayDetails: ReplayDetails lateinit var restoreDetails: RestoreDetails + companion object { + const val BOOTSTRAPPING = "BOOTSTRAPPING" + const val SYNCING = "SYNCING" + } + constructor(si: StreamInput) : super(si) { this.status = si.readString() - if (status.equals("SYNCING")) + if (status.equals(SYNCING)) this.replayDetails = ReplayDetails(si) - if (status.equals("BOOTSTRAPPING")) + if (status.equals(BOOTSTRAPPING)) this.restoreDetails = RestoreDetails(si) } diff --git a/src/main/kotlin/org/opensearch/replication/action/status/TranportShardsInfoAction.kt b/src/main/kotlin/org/opensearch/replication/action/status/TranportShardsInfoAction.kt index ee5e359e..d3f542d8 100644 --- a/src/main/kotlin/org/opensearch/replication/action/status/TranportShardsInfoAction.kt +++ b/src/main/kotlin/org/opensearch/replication/action/status/TranportShardsInfoAction.kt @@ -79,13 +79,13 @@ class TranportShardsInfoAction @Inject constructor(clusterService: ClusterServi var indexState = indexShard.recoveryState().index if (indexShard.recoveryState().recoverySource.type.equals(RecoverySource.Type.SNAPSHOT) and (indexState.recoveredBytesPercent() <100)) { - return ShardInfoResponse(shardRouting.shardId(),"BOOTSTRAPPING", + return ShardInfoResponse(shardRouting.shardId(),ShardInfoResponse.BOOTSTRAPPING, RestoreDetails(indexState.totalBytes(), indexState.recoveredBytes(), indexState.recoveredBytesPercent(), indexState.totalFileCount(), indexState.recoveredFileCount(), indexState.recoveredFilesPercent(), indexState.startTime(), indexState.time())) } var seqNo = indexShard.localCheckpoint + 1 - return ShardInfoResponse(shardRouting.shardId(),"SYNCING", ReplayDetails(indexShard.lastKnownGlobalCheckpoint, + return ShardInfoResponse(shardRouting.shardId(),ShardInfoResponse.SYNCING, ReplayDetails(indexShard.lastKnownGlobalCheckpoint, indexShard.lastSyncedGlobalCheckpoint, seqNo)) } diff --git a/src/main/kotlin/org/opensearch/replication/action/status/TransportReplicationStatusAction.kt b/src/main/kotlin/org/opensearch/replication/action/status/TransportReplicationStatusAction.kt index 3e2e2dda..317fb621 100644 --- a/src/main/kotlin/org/opensearch/replication/action/status/TransportReplicationStatusAction.kt +++ b/src/main/kotlin/org/opensearch/replication/action/status/TransportReplicationStatusAction.kt @@ -68,7 +68,7 @@ class TransportReplicationStatusAction @Inject constructor(transportService: Tra if (followerResponse.shardInfoResponse.size > 0) { status = followerResponse.shardInfoResponse.get(0).status } - if (!status.equals("BOOTSTRAPPING")) { + if (!status.equals(ShardInfoResponse.BOOTSTRAPPING)) { var shardResponses = followerResponse.shardInfoResponse leaderResponse.shardInfoResponse.listIterator().forEach { val leaderShardName = it.shardId.toString() diff --git a/src/main/kotlin/org/opensearch/replication/metadata/TransportUpdateMetadataAction.kt b/src/main/kotlin/org/opensearch/replication/metadata/TransportUpdateMetadataAction.kt index a2b21ee3..7a7fb09d 100644 --- a/src/main/kotlin/org/opensearch/replication/metadata/TransportUpdateMetadataAction.kt +++ b/src/main/kotlin/org/opensearch/replication/metadata/TransportUpdateMetadataAction.kt @@ -249,10 +249,9 @@ class TransportUpdateMetadataAction @Inject constructor( listener: ActionListener, metadataMappingService: MetadataMappingService ) { val mappingRequest = request.request as PutMappingRequest - val updateRequest = PutMappingClusterStateUpdateRequest() + val updateRequest = PutMappingClusterStateUpdateRequest(mappingRequest.source()) .ackTimeout(mappingRequest.timeout()).masterNodeTimeout(mappingRequest.masterNodeTimeout()) - .indices(concreteIndices).type(mappingRequest.type()) - .source(mappingRequest.source()) + .indices(concreteIndices) metadataMappingService.putMapping(updateRequest, object : ActionListener { diff --git a/src/main/kotlin/org/opensearch/replication/metadata/store/ReplicationMetadataStore.kt b/src/main/kotlin/org/opensearch/replication/metadata/store/ReplicationMetadataStore.kt index b57df431..0ce5683f 100644 --- a/src/main/kotlin/org/opensearch/replication/metadata/store/ReplicationMetadataStore.kt +++ b/src/main/kotlin/org/opensearch/replication/metadata/store/ReplicationMetadataStore.kt @@ -99,7 +99,7 @@ class ReplicationMetadataStore constructor(val client: Client, val clusterServic val id = getId(addReq.replicationMetadata.metadataType, addReq.replicationMetadata.connectionName, addReq.replicationMetadata.followerContext.resource) - val indexReqBuilder = client.prepareIndex(REPLICATION_CONFIG_SYSTEM_INDEX, MAPPING_TYPE, id) + val indexReqBuilder = client.prepareIndex(REPLICATION_CONFIG_SYSTEM_INDEX).setId(id) .setSource(addReq.replicationMetadata.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) return client.suspending(indexReqBuilder::execute, defaultContext = true)("replication") } @@ -114,7 +114,7 @@ class ReplicationMetadataStore constructor(val client: Client, val clusterServic } if(REPLICATION_STORE_MAPPING_VERSION > currentSchemaVersion) { - val putMappingReq = PutMappingRequest(REPLICATION_CONFIG_SYSTEM_INDEX).type(MAPPING_TYPE) + val putMappingReq = PutMappingRequest(REPLICATION_CONFIG_SYSTEM_INDEX) .source(REPLICATION_CONFIG_SYSTEM_INDEX_MAPPING, XContentType.JSON) val putMappingRes = client.suspending(client.admin().indices()::putMapping, defaultContext = true)(putMappingReq) if(!putMappingRes.isAcknowledged) { @@ -227,7 +227,7 @@ class ReplicationMetadataStore constructor(val client: Client, val clusterServic checkAndWaitForStoreHealth() checkAndUpdateMapping() - val indexReqBuilder = client.prepareIndex(REPLICATION_CONFIG_SYSTEM_INDEX, MAPPING_TYPE, id) + val indexReqBuilder = client.prepareIndex(REPLICATION_CONFIG_SYSTEM_INDEX).setId(id) .setSource(updateMetadataReq.replicationMetadata.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) .setIfSeqNo(updateMetadataReq.ifSeqno) .setIfPrimaryTerm(updateMetadataReq.ifPrimaryTerm) @@ -245,7 +245,7 @@ class ReplicationMetadataStore constructor(val client: Client, val clusterServic private suspend fun createIndex(): CreateIndexResponse { val createIndexReq = CreateIndexRequest(REPLICATION_CONFIG_SYSTEM_INDEX, configStoreSettings()) - .mapping(MAPPING_TYPE, REPLICATION_CONFIG_SYSTEM_INDEX_MAPPING, XContentType.JSON) + .mapping(REPLICATION_CONFIG_SYSTEM_INDEX_MAPPING, XContentType.JSON) return client.suspending(client.admin().indices()::create, defaultContext = true)(createIndexReq) } diff --git a/src/main/kotlin/org/opensearch/replication/repository/RemoteClusterRestoreLeaderService.kt b/src/main/kotlin/org/opensearch/replication/repository/RemoteClusterRestoreLeaderService.kt index 511de564..7adfc8aa 100644 --- a/src/main/kotlin/org/opensearch/replication/repository/RemoteClusterRestoreLeaderService.kt +++ b/src/main/kotlin/org/opensearch/replication/repository/RemoteClusterRestoreLeaderService.kt @@ -22,7 +22,6 @@ import org.opensearch.common.inject.Inject import org.opensearch.common.inject.Singleton import org.opensearch.common.lucene.store.InputStreamIndexInput import org.opensearch.core.internal.io.IOUtils -import org.opensearch.index.engine.Engine import org.opensearch.index.seqno.RetentionLeaseActions import org.opensearch.index.store.Store import org.opensearch.indices.IndicesService @@ -95,7 +94,7 @@ class RemoteClusterRestoreLeaderService @Inject constructor(private val indicesS * lucene index. With the retention lock set - safe commit should have all the history * upto the current retention leases. */ - val retentionLock = leaderIndexShard.acquireHistoryRetentionLock(Engine.HistorySource.INDEX) + val retentionLock = leaderIndexShard.acquireHistoryRetentionLock() closableResources.add(retentionLock) /** @@ -108,7 +107,7 @@ class RemoteClusterRestoreLeaderService @Inject constructor(private val indicesS val store = leaderIndexShard.store() var metadataSnapshot = Store.MetadataSnapshot.EMPTY store.performOp({ - metadataSnapshot = store.getMetadata(indexCommitRef.indexCommit) + metadataSnapshot = store.getMetadata(indexCommitRef.get()) }) // Identifies the seq no to start the replication operations from diff --git a/src/main/kotlin/org/opensearch/replication/repository/RestoreContext.kt b/src/main/kotlin/org/opensearch/replication/repository/RestoreContext.kt index 8265d4cd..4658eca9 100644 --- a/src/main/kotlin/org/opensearch/replication/repository/RestoreContext.kt +++ b/src/main/kotlin/org/opensearch/replication/repository/RestoreContext.kt @@ -11,20 +11,22 @@ package org.opensearch.replication.repository +import org.apache.lucene.index.IndexCommit import org.opensearch.replication.util.performOp import org.apache.lucene.store.IOContext import org.apache.lucene.store.IndexInput import org.opensearch.OpenSearchException +import org.opensearch.common.concurrent.GatedCloseable import org.opensearch.index.engine.Engine import org.opensearch.index.shard.IndexShard import org.opensearch.index.store.Store import java.io.Closeable class RestoreContext(val restoreUUID: String, - val shard: IndexShard, - val indexCommitRef: Engine.IndexCommitRef, - val metadataSnapshot: Store.MetadataSnapshot, - val replayOperationsFrom: Long): Closeable { + val shard: IndexShard, + val indexCommitRef: GatedCloseable, + val metadataSnapshot: Store.MetadataSnapshot, + val replayOperationsFrom: Long): Closeable { companion object { private const val INITIAL_FILE_CACHE_CAPACITY = 20 diff --git a/src/main/kotlin/org/opensearch/replication/rest/ReplicationStatusHandler.kt b/src/main/kotlin/org/opensearch/replication/rest/ReplicationStatusHandler.kt index 7e1d926e..50f6f388 100644 --- a/src/main/kotlin/org/opensearch/replication/rest/ReplicationStatusHandler.kt +++ b/src/main/kotlin/org/opensearch/replication/rest/ReplicationStatusHandler.kt @@ -39,7 +39,7 @@ class ReplicationStatusHandler : BaseRestHandler() { @Throws(IOException::class) override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { - val index = request.param("index") + val index = request.param("index", "") var isVerbose = (request.paramAsBoolean("verbose", false)) val indexReplicationStatusRequest = ShardInfoRequest(index,isVerbose) return RestChannelConsumer { diff --git a/src/main/kotlin/org/opensearch/replication/seqno/RemoteClusterTranslogService.kt b/src/main/kotlin/org/opensearch/replication/seqno/RemoteClusterTranslogService.kt index 64b1a7fe..3ad2e00e 100644 --- a/src/main/kotlin/org/opensearch/replication/seqno/RemoteClusterTranslogService.kt +++ b/src/main/kotlin/org/opensearch/replication/seqno/RemoteClusterTranslogService.kt @@ -37,33 +37,17 @@ class RemoteClusterTranslogService : AbstractLifecycleComponent(){ public fun getHistoryOfOperations(indexShard: IndexShard, startSeqNo: Long, toSeqNo: Long): List { log.trace("Fetching translog snapshot for $indexShard - from $startSeqNo to $toSeqNo") - val snapshot = indexShard.getHistoryOperations(SOURCE_NAME, Engine.HistorySource.TRANSLOG, startSeqNo, toSeqNo) - - // Total ops to be fetched (both toSeqNo and startSeqNo are inclusive) - val opsSize = toSeqNo - startSeqNo + 1 - val ops = ArrayList(opsSize.toInt()) - - // Filter and sort specific ops from the obtained history - var filteredOpsFromTranslog = 0 - snapshot.use { - var op = snapshot.next() - while(op != null) { - if(op.seqNo() in startSeqNo..toSeqNo) { - ops.add(op) - filteredOpsFromTranslog++ - } + // TODO: Revisit the method after closing the issue: https://github.com/opensearch-project/OpenSearch/issues/2482 + var ops: List = listOf() + indexShard.newChangesSnapshot("odr", startSeqNo, toSeqNo, true, true).use { snapshot -> + ops = ArrayList(snapshot.totalOperations()) + var op = snapshot.next() + while (op != null) { + (ops as ArrayList).add(op) op = snapshot.next() } } - assert(filteredOpsFromTranslog == opsSize.toInt()) {"Missing operations while fetching from translog"} - - val sortedOps = ArrayList(opsSize.toInt()) - sortedOps.addAll(ops) - for(ele in ops) { - sortedOps[(ele.seqNo() - startSeqNo).toInt()] = ele - } - log.debug("Starting seqno after sorting ${sortedOps[0].seqNo()} and ending seqno ${sortedOps[ops.size-1].seqNo()}") - return sortedOps.subList(0, ops.size.coerceAtMost((opsSize).toInt())) + return ops } } diff --git a/src/main/kotlin/org/opensearch/replication/task/CrossClusterReplicationTask.kt b/src/main/kotlin/org/opensearch/replication/task/CrossClusterReplicationTask.kt index 7c5de728..933be175 100644 --- a/src/main/kotlin/org/opensearch/replication/task/CrossClusterReplicationTask.kt +++ b/src/main/kotlin/org/opensearch/replication/task/CrossClusterReplicationTask.kt @@ -27,6 +27,7 @@ import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import kotlinx.coroutines.withContext import kotlinx.coroutines.withTimeoutOrNull +import kotlinx.coroutines.ObsoleteCoroutinesApi import org.apache.logging.log4j.Logger import org.opensearch.OpenSearchException import org.opensearch.action.ActionListener @@ -178,6 +179,7 @@ abstract class CrossClusterReplicationTask(id: Long, type: String, action: Strin client.suspending(::updatePersistentTaskState)(state) } + @ObsoleteCoroutinesApi protected abstract suspend fun execute(scope: CoroutineScope, initialState: PersistentTaskState?) protected open suspend fun cleanup() {} diff --git a/src/main/kotlin/org/opensearch/replication/task/autofollow/AutoFollowTask.kt b/src/main/kotlin/org/opensearch/replication/task/autofollow/AutoFollowTask.kt index 41590511..570e39b7 100644 --- a/src/main/kotlin/org/opensearch/replication/task/autofollow/AutoFollowTask.kt +++ b/src/main/kotlin/org/opensearch/replication/task/autofollow/AutoFollowTask.kt @@ -36,6 +36,9 @@ import org.opensearch.common.xcontent.ToXContent import org.opensearch.common.xcontent.XContentBuilder import org.opensearch.persistent.PersistentTaskState import org.opensearch.replication.ReplicationException +import org.opensearch.replication.action.status.ReplicationStatusAction +import org.opensearch.replication.action.status.ShardInfoRequest +import org.opensearch.replication.action.status.ShardInfoResponse import org.opensearch.rest.RestStatus import org.opensearch.tasks.Task import org.opensearch.tasks.TaskId @@ -61,6 +64,7 @@ class AutoFollowTask(id: Long, type: String, action: String, description: String override val log = Loggers.getLogger(javaClass, leaderAlias) private var trackingIndicesOnTheCluster = setOf() private var failedIndices = ConcurrentSkipListSet() // Failed indices for replication from this autofollow task + private var replicationJobsQueue = ConcurrentSkipListSet() // To keep track of outstanding replication jobs for this autofollow task private var retryScheduler: Scheduler.ScheduledCancellable? = null lateinit var stat: AutoFollowStat @@ -69,7 +73,7 @@ class AutoFollowTask(id: Long, type: String, action: String, description: String while (scope.isActive) { try { addRetryScheduler() - autoFollow() + pollForIndices() delay(replicationSettings.autofollowFetchPollDuration.millis) } catch(e: OpenSearchException) { @@ -101,7 +105,7 @@ class AutoFollowTask(id: Long, type: String, action: String, description: String retryScheduler?.cancel() } - private suspend fun autoFollow() { + private suspend fun pollForIndices() { log.debug("Checking $leaderAlias under pattern name $patternName for new indices to auto follow") val entry = replicationMetadata.leaderContext.resource @@ -133,13 +137,44 @@ class AutoFollowTask(id: Long, type: String, action: String, description: String trackingIndicesOnTheCluster = currentIndices.toSet() } } - remoteIndices = remoteIndices.minus(currentIndices).minus(failedIndices) + remoteIndices = remoteIndices.minus(currentIndices).minus(failedIndices).minus(replicationJobsQueue) stat.failCounterForRun = 0 + startReplicationJobs(remoteIndices) + stat.failCount = stat.failCounterForRun + } + + private suspend fun startReplicationJobs(remoteIndices: Iterable) { + val completedJobs = ConcurrentSkipListSet() + for(index in replicationJobsQueue) { + val statusReq = ShardInfoRequest(index, false) + try { + val statusRes = client.suspendExecute(ReplicationStatusAction.INSTANCE, statusReq, injectSecurityContext = true) + if(statusRes.status != ShardInfoResponse.BOOTSTRAPPING) { + completedJobs.add(index) + } + } catch (ex: Exception) { + log.error("Error while fetching the status for index $index", ex) + } + } + + // Remove the indices in "syncing" state from the queue + replicationJobsQueue.removeAll(completedJobs) + val concurrentJobsAllowed = replicationSettings.autofollowConcurrentJobsTriggerSize + if(replicationJobsQueue.size >= concurrentJobsAllowed) { + log.debug("Max concurrent replication jobs already in the queue for autofollow task[${params.patternName}]") + return + } + + var totalJobsToTrigger = concurrentJobsAllowed - replicationJobsQueue.size for (newRemoteIndex in remoteIndices) { + if(totalJobsToTrigger <= 0) { + break + } startReplication(newRemoteIndex) + replicationJobsQueue.add(newRemoteIndex) + totalJobsToTrigger-- } - stat.failCount = stat.failCounterForRun } private suspend fun startReplication(leaderIndex: String) { diff --git a/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt b/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt index e499813d..8ca392ae 100644 --- a/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt +++ b/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt @@ -798,7 +798,7 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript } else { return FailedState(Collections.emptyMap(), """ Unable to find in progress restore for remote index: $leaderAlias:$leaderIndex. - This can happen if there was a badly timed master node failure.""".trimIndent()) + This can happen if there was a badly timed cluster manager node failure.""".trimIndent()) } } else if (restore.state() == RestoreInProgress.State.FAILURE) { val failureReason = restore.shards().values().find { diff --git a/src/test/kotlin/org/opensearch/replication/ReplicationHelpers.kt b/src/test/kotlin/org/opensearch/replication/ReplicationHelpers.kt index 7d76e3bb..4e83cab7 100644 --- a/src/test/kotlin/org/opensearch/replication/ReplicationHelpers.kt +++ b/src/test/kotlin/org/opensearch/replication/ReplicationHelpers.kt @@ -380,3 +380,19 @@ fun RestHighLevelClient.updateReplicationStartBlockSetting(enabled: Boolean) { val response = this.cluster().putSettings(updateSettingsRequest, RequestOptions.DEFAULT) assertThat(response.isAcknowledged).isTrue() } + +fun RestHighLevelClient.updateAutoFollowConcurrentStartReplicationJobSetting(concurrentJobs: Int?) { + val settings = if(concurrentJobs != null) { + Settings.builder() + .put("plugins.replication.autofollow.concurrent_replication_jobs_trigger_size", concurrentJobs) + .build() + } else { + Settings.builder() + .putNull("plugins.replication.autofollow.concurrent_replication_jobs_trigger_size") + .build() + } + val updateSettingsRequest = ClusterUpdateSettingsRequest() + updateSettingsRequest.persistentSettings(settings) + val response = this.cluster().putSettings(updateSettingsRequest, RequestOptions.DEFAULT) + assertThat(response.isAcknowledged).isTrue() +} diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/ClusterRerouteFollowerIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/ClusterRerouteFollowerIT.kt index da6a2bf2..87cb313b 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/ClusterRerouteFollowerIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/ClusterRerouteFollowerIT.kt @@ -44,7 +44,7 @@ class ClusterRerouteFollowerIT : MultiClusterRestTestCase() { followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName)) insertDocToIndex(LEADER, "1", "dummy data 1",leaderIndexName) - //Querying ES cluster throws random exceptions like MasterNotDiscovered or ShardsFailed etc, so catching them and retrying + //Querying ES cluster throws random exceptions like ClusterManagerNotDiscovered or ShardsFailed etc, so catching them and retrying assertBusy ({ try { Assertions.assertThat(docs(FOLLOWER, followerIndexName)).contains("dummy data 1") diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/ClusterRerouteLeaderIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/ClusterRerouteLeaderIT.kt index 8ce57fc1..3d40ca89 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/ClusterRerouteLeaderIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/ClusterRerouteLeaderIT.kt @@ -44,7 +44,7 @@ class ClusterRerouteLeaderIT : MultiClusterRestTestCase() { followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName)) insertDocToIndex(LEADER, "1", "dummy data 1",leaderIndexName) - //Querying ES cluster throws random exceptions like MasterNotDiscovered or ShardsFailed etc, so catching them and retrying + //Querying ES cluster throws random exceptions like ClusterManagerNotDiscovered or ShardsFailed etc, so catching them and retrying assertBusy ({ try { Assertions.assertThat(docs(FOLLOWER, followerIndexName)).contains("dummy data 1") diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/ReplicationStatusIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/ReplicationStatusIT.kt new file mode 100644 index 00000000..4b4e3762 --- /dev/null +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/ReplicationStatusIT.kt @@ -0,0 +1,64 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.replication.integ.rest + +import org.assertj.core.api.Assertions +import org.junit.Assert +import org.opensearch.client.RequestOptions +import org.opensearch.client.ResponseException +import org.opensearch.client.indices.CreateIndexRequest +import org.opensearch.replication.MultiClusterAnnotations +import org.opensearch.replication.MultiClusterRestTestCase +import org.opensearch.replication.startReplication +import org.opensearch.replication.stopReplication +import org.opensearch.replication.StartReplicationRequest +import org.opensearch.replication.replicationStatus +import org.opensearch.replication.`validate status syncing response` +import java.util.concurrent.TimeUnit + +@MultiClusterAnnotations.ClusterConfigurations( + MultiClusterAnnotations.ClusterConfiguration(clusterName = LEADER), + MultiClusterAnnotations.ClusterConfiguration(clusterName = FOLLOWER) +) +class ReplicationStatusIT: MultiClusterRestTestCase() { + + fun `test replication status with valid params`() { + val followerClient = getClientForCluster(FOLLOWER) + val leaderClient = getClientForCluster(LEADER) + val indexName = "test-status-valid-param" + createConnectionBetweenClusters(FOLLOWER, LEADER) + val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(indexName), RequestOptions.DEFAULT) + Assertions.assertThat(createIndexResponse.isAcknowledged).isTrue() + try { + followerClient.startReplication(StartReplicationRequest("source", indexName, indexName), waitForRestore = true) + assertBusy({ + var statusResp = followerClient.replicationStatus(indexName) + `validate status syncing response`(statusResp) + }, 30, TimeUnit.SECONDS) + } finally { + followerClient.stopReplication(indexName) + } + } + + fun `test replication status without valid params`() { + val followerClient = getClientForCluster(FOLLOWER) + createConnectionBetweenClusters(FOLLOWER, LEADER) + try { + followerClient.replicationStatus("") + Assert.fail("Status API shouldn't succeed in this case") + } catch (e: ResponseException) { + Assert.assertEquals(e.response.statusLine.statusCode, 400) + Assert.assertTrue(e.message != null) + Assert.assertTrue(e.message!!.contains("Index name must be specified to obtain replication status")) + } + } +} diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/SecurityCustomRolesIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/SecurityCustomRolesIT.kt index 8500c2eb..8d5be9cc 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/SecurityCustomRolesIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/SecurityCustomRolesIT.kt @@ -399,7 +399,7 @@ class SecurityCustomRolesIT: SecurityBase() { requestOptions = RequestOptions.DEFAULT.addBasicAuthHeader("testUser1","password")) insertDocToIndex(LEADER, "1", "dummy data 1",leaderIndexName) - //Querying ES cluster throws random exceptions like MasterNotDiscovered or ShardsFailed etc, so catching them and retrying + //Querying ES cluster throws random exceptions like ClusterManagerNotDiscovered or ShardsFailed etc, so catching them and retrying assertBusy ({ try { Assertions.assertThat(docs(FOLLOWER, followerIndexName)).contains("dummy data 1") diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/SecurityCustomRolesLeaderIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/SecurityCustomRolesLeaderIT.kt index 05ea40aa..af81a6b5 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/SecurityCustomRolesLeaderIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/SecurityCustomRolesLeaderIT.kt @@ -72,7 +72,7 @@ class SecurityCustomRolesLeaderIT: SecurityBase() { requestOptions = RequestOptions.DEFAULT.addBasicAuthHeader("testUser1","password")) insertDocToIndex(LEADER, "1", "dummy data 1",leaderIndexName) - //Querying ES cluster throws random exceptions like MasterNotDiscovered or ShardsFailed etc, so catching them and retrying + //Querying ES cluster throws random exceptions like ClusterManagerNotDiscovered or ShardsFailed etc, so catching them and retrying assertBusy ({ try { Assertions.assertThat(docs(FOLLOWER, followerIndexName)).contains("dummy data 1") diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt index c01f6b21..ceda4873 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt @@ -546,13 +546,16 @@ class StartReplicationIT: MultiClusterRestTestCase() { TimeUnit.SECONDS.sleep(SLEEP_TIME_BETWEEN_SYNC) getSettingsRequest.indices(followerIndexName) // Leader setting is copied - Assert.assertEquals( + assertBusy({ + Assert.assertEquals( "2", followerClient.indices() - .getSettings(getSettingsRequest, RequestOptions.DEFAULT) - .indexToSettings[followerIndexName][IndexMetadata.SETTING_NUMBER_OF_REPLICAS] - ) - assertEqualAliases() + .getSettings(getSettingsRequest, RequestOptions.DEFAULT) + .indexToSettings[followerIndexName][IndexMetadata.SETTING_NUMBER_OF_REPLICAS] + ) + assertEqualAliases() + }, 30L, TimeUnit.SECONDS) + // Case 2 : Blocklisted setting are not copied Assert.assertNull(followerClient.indices() @@ -580,28 +583,30 @@ class StartReplicationIT: MultiClusterRestTestCase() { TimeUnit.SECONDS.sleep(SLEEP_TIME_BETWEEN_SYNC) // Case 3 : Updated Settings take higher priority. Blocklisted settins shouldn't matter for that - Assert.assertEquals( + assertBusy({ + Assert.assertEquals( "3", followerClient.indices() - .getSettings(getSettingsRequest, RequestOptions.DEFAULT) - .indexToSettings[followerIndexName][IndexMetadata.SETTING_NUMBER_OF_REPLICAS] - ) + .getSettings(getSettingsRequest, RequestOptions.DEFAULT) + .indexToSettings[followerIndexName][IndexMetadata.SETTING_NUMBER_OF_REPLICAS] + ) - Assert.assertEquals( + Assert.assertEquals( "10s", followerClient.indices() - .getSettings(getSettingsRequest, RequestOptions.DEFAULT) - .indexToSettings[followerIndexName]["index.search.idle.after"] - ) + .getSettings(getSettingsRequest, RequestOptions.DEFAULT) + .indexToSettings[followerIndexName]["index.search.idle.after"] + ) - Assert.assertEquals( + Assert.assertEquals( "none", followerClient.indices() - .getSettings(getSettingsRequest, RequestOptions.DEFAULT) - .indexToSettings[followerIndexName]["index.routing.allocation.enable"] - ) + .getSettings(getSettingsRequest, RequestOptions.DEFAULT) + .indexToSettings[followerIndexName]["index.routing.allocation.enable"] + ) - assertEqualAliases() + assertEqualAliases() + }, 30L, TimeUnit.SECONDS) //Clear the settings settings = Settings.builder() @@ -987,25 +992,6 @@ class StartReplicationIT: MultiClusterRestTestCase() { } } - fun `test that replication cannot be started when soft delete is disabled`() { - val followerClient = getClientForCluster(FOLLOWER) - val leaderClient = getClientForCluster(LEADER) - - createConnectionBetweenClusters(FOLLOWER, LEADER) - - val settings: Settings = Settings.builder() - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.key, false) - .build() - - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName) - .settings(settings), RequestOptions.DEFAULT) - assertThat(createIndexResponse.isAcknowledged).isTrue() - - assertThatThrownBy { - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName)) - }.isInstanceOf(ResponseException::class.java).hasMessageContaining("Cannot Replicate an index where the setting index.soft_deletes.enabled is disabled") - } - fun `test leader stats`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/UpdateAutoFollowPatternIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/UpdateAutoFollowPatternIT.kt index 84db5426..d1a757bd 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/UpdateAutoFollowPatternIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/UpdateAutoFollowPatternIT.kt @@ -42,6 +42,7 @@ import org.opensearch.cluster.metadata.MetadataCreateIndexService import org.opensearch.replication.AutoFollowStats import org.opensearch.replication.ReplicationPlugin import org.opensearch.replication.updateReplicationStartBlockSetting +import org.opensearch.replication.updateAutoFollowConcurrentStartReplicationJobSetting import org.opensearch.replication.waitForShardTaskStart import org.opensearch.test.OpenSearchTestCase.assertBusy import java.lang.Thread.sleep @@ -369,6 +370,83 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() { } } + fun `test autofollow task with concurrent job setting set to run parallel jobs`() { + val followerClient = getClientForCluster(FOLLOWER) + val leaderClient = getClientForCluster(LEADER) + createConnectionBetweenClusters(FOLLOWER, LEADER, connectionAlias) + + // create two leader indices and test autofollow to trigger to trigger jobs based on setting + val leaderIndexName1 = createRandomIndex(leaderClient) + val leaderIndexName2 = createRandomIndex(leaderClient) + + followerClient.updateAutoFollowConcurrentStartReplicationJobSetting(2) + try { + followerClient.updateAutoFollowPattern(connectionAlias, indexPatternName, indexPattern) + + // Verify that existing index matching the pattern are replicated. + assertBusy { + Assertions.assertThat(followerClient.indices() + .exists(GetIndexRequest(leaderIndexName1), RequestOptions.DEFAULT)) + .isEqualTo(true) + } + + assertBusy { + Assertions.assertThat(followerClient.indices() + .exists(GetIndexRequest(leaderIndexName2), RequestOptions.DEFAULT)) + .isEqualTo(true) + } + + sleep(30000) // Default poll for auto follow in worst case + + Assertions.assertThat(getAutoFollowTasks(FOLLOWER).size).isEqualTo(1) + + } finally { + // Reset default autofollow setting + followerClient.updateAutoFollowConcurrentStartReplicationJobSetting(null) + followerClient.deleteAutoFollowPattern(connectionAlias, indexPatternName) + followerClient.stopReplication(leaderIndexName1) + followerClient.stopReplication(leaderIndexName2) + } + } + + fun `test autofollow task with concurrent job setting set to run single job`() { + val followerClient = getClientForCluster(FOLLOWER) + val leaderClient = getClientForCluster(LEADER) + createConnectionBetweenClusters(FOLLOWER, LEADER, connectionAlias) + + // create two leader indices and test autofollow to trigger to trigger jobs based on setting + val leaderIndexName1 = createRandomIndex(leaderClient) + val leaderIndexName2 = createRandomIndex(leaderClient) + + followerClient.updateAutoFollowConcurrentStartReplicationJobSetting(1) + try { + followerClient.updateAutoFollowPattern(connectionAlias, indexPatternName, indexPattern) + + // Verify that existing index matching the pattern are replicated. + assertBusy { + // check that the index replication task is created for only index + Assertions.assertThat(getIndexReplicationTasks(FOLLOWER).size).isEqualTo(1) + } + + sleep(30000) // Default poll for auto follow in worst case + + assertBusy { + // check that the index replication task is created for only index + Assertions.assertThat(getIndexReplicationTasks(FOLLOWER).size).isEqualTo(2) + } + + sleep(30000) // Default poll for auto follow in worst case + Assertions.assertThat(getAutoFollowTasks(FOLLOWER).size).isEqualTo(1) + + } finally { + // Reset default autofollow setting + followerClient.updateAutoFollowConcurrentStartReplicationJobSetting(null) + followerClient.deleteAutoFollowPattern(connectionAlias, indexPatternName) + followerClient.stopReplication(leaderIndexName1) + followerClient.stopReplication(leaderIndexName2) + } + } + fun createRandomIndex(client: RestHighLevelClient): String { val indexName = indexPrefix + randomAlphaOfLength(6).toLowerCase(Locale.ROOT) val createIndexResponse = client.indices().create(CreateIndexRequest(indexName), RequestOptions.DEFAULT) diff --git a/src/test/kotlin/org/opensearch/replication/task/index/NoOpClient.kt b/src/test/kotlin/org/opensearch/replication/task/index/NoOpClient.kt index 1666c5d3..1c92c0bb 100644 --- a/src/test/kotlin/org/opensearch/replication/task/index/NoOpClient.kt +++ b/src/test/kotlin/org/opensearch/replication/task/index/NoOpClient.kt @@ -129,7 +129,7 @@ open class NoOpClient(testName :String) : NoOpNodeClient(testName) { var bytesReference = replicationMetadata.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS) var by = BytesReference.bytes(bytesReference) - var result = GetResult(ReplicationMetadataStore.REPLICATION_CONFIG_SYSTEM_INDEX, "_doc", IndexReplicationTaskTests.followerIndex, 1, 1, 1, true, by, null, null) + var result = GetResult(ReplicationMetadataStore.REPLICATION_CONFIG_SYSTEM_INDEX, IndexReplicationTaskTests.followerIndex, 1, 1, 1, true, by, null, null) var getResponse = GetResponse(result) listener.onResponse(getResponse as Response) } else if (action == ClusterHealthAction.INSTANCE) { diff --git a/src/test/kotlin/org/opensearch/replication/task/shard/TranslogSequencerTests.kt b/src/test/kotlin/org/opensearch/replication/task/shard/TranslogSequencerTests.kt index 596cad01..ed5afb06 100644 --- a/src/test/kotlin/org/opensearch/replication/task/shard/TranslogSequencerTests.kt +++ b/src/test/kotlin/org/opensearch/replication/task/shard/TranslogSequencerTests.kt @@ -109,7 +109,7 @@ class TranslogSequencerTests : OpenSearchTestCase() { var seqNo = startSeqNo val changes = randomList(1, randomIntBetween(1, 512)) { seqNo = seqNo.inc() - Translog.Index("_doc", randomAlphaOfLength(10).toLowerCase(Locale.ROOT), seqNo, + Translog.Index(randomAlphaOfLength(10).toLowerCase(Locale.ROOT), seqNo, 1L, "{}".toByteArray(Charsets.UTF_8)) } return Pair(GetChangesResponse(changes, startSeqNo.inc(), startSeqNo, -1), seqNo) diff --git a/src/test/resources/replication/opensearch-cross-cluster-replication-1.1.0.0.zip b/src/test/resources/replication/opensearch-cross-cluster-replication-1.1.0.0.zip deleted file mode 100644 index 0e93dce1..00000000 Binary files a/src/test/resources/replication/opensearch-cross-cluster-replication-1.1.0.0.zip and /dev/null differ