diff --git a/src/main/kotlin/org/opensearch/replication/MappingNotAvailableException.kt b/src/main/kotlin/org/opensearch/replication/MappingNotAvailableException.kt new file mode 100644 index 00000000..d4817478 --- /dev/null +++ b/src/main/kotlin/org/opensearch/replication/MappingNotAvailableException.kt @@ -0,0 +1,21 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.replication; + +import org.opensearch.OpenSearchException + +public class MappingNotAvailableException: OpenSearchException { + + constructor(message: String, vararg args: Any) : super(message, *args) + + constructor(message: String, cause: Throwable, vararg args: Any) : super(message, cause, *args) +} diff --git a/src/main/kotlin/org/opensearch/replication/action/replay/TransportReplayChangesAction.kt b/src/main/kotlin/org/opensearch/replication/action/replay/TransportReplayChangesAction.kt index 0a64584f..170a46ec 100644 --- a/src/main/kotlin/org/opensearch/replication/action/replay/TransportReplayChangesAction.kt +++ b/src/main/kotlin/org/opensearch/replication/action/replay/TransportReplayChangesAction.kt @@ -11,6 +11,7 @@ package org.opensearch.replication.action.replay +import org.opensearch.replication.MappingNotAvailableException import org.opensearch.replication.metadata.UpdateMetadataAction import org.opensearch.replication.metadata.UpdateMetadataRequest import org.opensearch.replication.metadata.checkIfIndexBlockedWithLevel @@ -174,7 +175,12 @@ class TransportReplayChangesAction @Inject constructor(settings: Settings, trans val options = IndicesOptions.strictSingleIndexNoExpandForbidClosed() val getMappingsRequest = GetMappingsRequest().indices(leaderIndex).indicesOptions(options) val getMappingsResponse = remoteClient.suspending(remoteClient.admin().indices()::getMappings, injectSecurityContext = true)(getMappingsRequest) - val mappingSource = getMappingsResponse.mappings().get(leaderIndex).get(type).source().string() + val mappingSource = getMappingsResponse?.mappings()?.get(leaderIndex)?.get(type)?.source()?.string() + if (null == mappingSource) { + log.error("Mapping response: $getMappingsResponse") + throw MappingNotAvailableException("Mapping for the index $leaderIndex is not available") + } + // This should use MappingUpdateAction but that uses PutMappingRequest internally and // PutMappingRequest#setConcreteIndex has a bug where it throws an NPE.This is fixed upstream in diff --git a/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt b/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt index cad3f0ef..63c192d8 100644 --- a/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt +++ b/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt @@ -11,6 +11,7 @@ package org.opensearch.replication.task.shard +import org.opensearch.replication.MappingNotAvailableException import org.opensearch.replication.ReplicationException import org.opensearch.replication.action.changes.GetChangesResponse import org.opensearch.replication.action.replay.ReplayChangesAction @@ -28,6 +29,7 @@ import org.opensearch.common.logging.Loggers import org.opensearch.index.shard.ShardId import org.opensearch.index.translog.Translog import org.opensearch.tasks.TaskId +import java.util.ArrayList import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.TimeUnit @@ -65,7 +67,16 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata: replayRequest.parentTask = parentTaskId launch { var relativeStartNanos = System.nanoTime() - val replayResponse = client.suspendExecuteWithRetries(replicationMetadata, ReplayChangesAction.INSTANCE, replayRequest, log = log) + val retryOnExceptions = ArrayList>() + retryOnExceptions.add(MappingNotAvailableException::class.java) + + val replayResponse = client.suspendExecuteWithRetries( + replicationMetadata, + ReplayChangesAction.INSTANCE, + replayRequest, + log = log, + retryOn = retryOnExceptions + ) if (replayResponse.shardInfo.failed > 0) { replayResponse.shardInfo.failures.forEachIndexed { i, failure -> log.error("Failed replaying changes. Failure:$i:$failure")