Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] [Segment Replication] Backport PR's : #3525 #3533 #3540 #3943 #3963 From main branch #4181

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static java.util.Collections.unmodifiableMap;
import static org.opensearch.Version.V_2_1_0;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_UUID_NA_VALUE;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureFieldName;
Expand Down Expand Up @@ -1594,6 +1595,12 @@ private enum OpenSearchExceptionHandle {
org.opensearch.transport.NoSeedNodeLeftException::new,
160,
LegacyESVersion.V_7_10_0
),
REPLICATION_FAILED_EXCEPTION(
org.opensearch.indices.replication.common.ReplicationFailedException.class,
org.opensearch.indices.replication.common.ReplicationFailedException::new,
161,
V_2_1_0
);

final Class<? extends OpenSearchException> exceptionClass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogStats;
import org.opensearch.index.translog.TranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogStats;
import org.opensearch.search.suggest.completion.CompletionStats;

import java.io.Closeable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2285,7 +2285,7 @@ protected SegmentInfos getLastCommittedSegmentInfos() {
}

@Override
public SegmentInfos getLatestSegmentInfos() {
protected SegmentInfos getLatestSegmentInfos() {
OpenSearchDirectoryReader reader = null;
try {
reader = internalReaderManager.acquire();
Expand Down
64 changes: 57 additions & 7 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,9 @@
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.rest.RestStatus;
Expand Down Expand Up @@ -1361,6 +1361,12 @@ public GatedCloseable<IndexCommit> acquireLastIndexCommit(boolean flushFirst) th
}
}

public void finalizeReplication(SegmentInfos infos, long seqNo) throws IOException {
if (getReplicationEngine().isPresent()) {
getReplicationEngine().get().updateSegments(infos, seqNo);
}
}

/**
* Snapshots the most recent safe index commit from the currently running engine.
* All index files referenced by this index commit won't be freed until the commit/snapshot is closed.
Expand All @@ -1379,15 +1385,60 @@ public GatedCloseable<IndexCommit> acquireSafeIndexCommit() throws EngineExcepti
* Returns the lastest Replication Checkpoint that shard received
*/
public ReplicationCheckpoint getLatestReplicationCheckpoint() {
return new ReplicationCheckpoint(shardId, 0, 0, 0, 0);
try (final GatedCloseable<SegmentInfos> snapshot = getSegmentInfosSnapshot()) {
return Optional.ofNullable(snapshot.get())
.map(
segmentInfos -> new ReplicationCheckpoint(
this.shardId,
getOperationPrimaryTerm(),
segmentInfos.getGeneration(),
getProcessedLocalCheckpoint(),
segmentInfos.getVersion()
)
)
.orElse(
new ReplicationCheckpoint(
shardId,
getOperationPrimaryTerm(),
SequenceNumbers.NO_OPS_PERFORMED,
getProcessedLocalCheckpoint(),
SequenceNumbers.NO_OPS_PERFORMED
)
);
} catch (IOException ex) {
throw new OpenSearchException("Error Closing SegmentInfos Snapshot", ex);
}
}

/**
* Invoked when a new checkpoint is received from a primary shard. Starts the copy process.
* Checks if checkpoint should be processed
*
* @param requestCheckpoint received checkpoint that is checked for processing
* @return true if checkpoint should be processed
*/
public synchronized void onNewCheckpoint(final PublishCheckpointRequest request) {
assert shardRouting.primary() == false;
// TODO
public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) {
if (state().equals(IndexShardState.STARTED) == false) {
logger.trace(() -> new ParameterizedMessage("Ignoring new replication checkpoint - shard is not started {}", state()));
return false;
}
ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint();
if (localCheckpoint.isAheadOf(requestCheckpoint)) {
logger.trace(
() -> new ParameterizedMessage(
"Ignoring new replication checkpoint - Shard is already on checkpoint {} that is ahead of {}",
localCheckpoint,
requestCheckpoint
)
);
return false;
}
if (localCheckpoint.equals(requestCheckpoint)) {
logger.trace(
() -> new ParameterizedMessage("Ignoring new replication checkpoint - Shard is already on checkpoint {}", requestCheckpoint)
);
return false;
}
return true;
}

/**
Expand Down Expand Up @@ -2657,7 +2708,6 @@ public long getLocalCheckpoint() {
* Also see {@link #getLocalCheckpoint()}.
*/
public long getProcessedLocalCheckpoint() {
assert indexSettings.isSegRepEnabled();
Copy link
Member Author

@Rishikesh1159 Rishikesh1159 Aug 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assert has been removed because it is failing many unit tests in SegmentReplicationTargetServiceTests.java and SegmentReplicationIndexShardTests.java. The unit tests in these classes don't use index setting as SEGMENT, so when an assert is done on if segRepEnabled() all of the shards fails the assert and don't perform necessary unit tests.

This assert has been added to 2.x branch in this backport PR. As the original PR had method getProcessedLocalCheckpoint() in Engine.java as abstract which is a breaking. To avoid breaking breaking changes we made few changes to original PR while backporting. And this assert is part of those changes. It is not necessary to have this assert as it will any how be removed in 3.0 opensearch

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the SEGMENT index setting in test classes. Can we backport #4163 if it helps ?

Copy link
Member Author

@Rishikesh1159 Rishikesh1159 Aug 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might help actually, but let me check by backporting your PR locally and add back assert statement to see if all of the unit tests pass

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried adding back assert and use SEGMENT replication as index setting in unit tests. But got this error:

REPRODUCE WITH: ./gradlew ':server:test' --tests "org.opensearch.indices.replication.OngoingSegmentReplicationsTests.testMultipleReplicasUseSameCheckpoint" -Dtests.seed=A0971E691B2879E0 -Dtests.security.manager=true -Dtests.jvm.argline="-XX:TieredStopAtLevel=1 -XX:ReservedCodeCacheSize=64m" -Dtests.locale=ms-MY -Dtests.timezone=Asia/Kolkata -Druntime.java=17
  2> java.lang.AssertionError
        at __randomizedtesting.SeedInfo.seed([A0971E691B2879E0:A4046FDAE5B7632]:0)
        at org.opensearch.index.shard.IndexShard.getProcessedLocalCheckpoint(IndexShard.java:2711)
        at org.opensearch.indices.replication.OngoingSegmentReplicationsTests.setUp(OngoingSegmentReplicationsTests.java:73)

so there are multiple places we have to makes changes to make this work and I don't think this PR is right place to do all changes. We can do them while backporting other PR's. So for now I am removing assert statement

// Returns checkpoint only if the current engine is an instance of NRTReplicationEngine or InternalEngine
return getReplicationEngine().map(NRTReplicationEngine::getProcessedLocalCheckpoint).orElseGet(() -> {
final Engine engine = getEngine();
Expand Down
46 changes: 46 additions & 0 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.Version;
import org.opensearch.ExceptionsHelper;
import org.opensearch.common.Nullable;
import org.opensearch.common.UUIDs;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.Streams;
Expand Down Expand Up @@ -706,6 +707,51 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr
}
}

/**
* This method deletes every file in this store that is not contained in either the remote or local metadata snapshots.
* This method is used for segment replication when the in memory SegmentInfos can be ahead of the on disk segment file.
* In this case files from both snapshots must be preserved. Verification has been done that all files are present on disk.
* @param reason the reason for this cleanup operation logged for each deleted file
* @param localSnapshot The local snapshot from in memory SegmentInfos.
* @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup.
*/
public void cleanupAndPreserveLatestCommitPoint(String reason, MetadataSnapshot localSnapshot) throws IOException {
// fetch a snapshot from the latest on disk Segments_N file. This can be behind
// the passed in local in memory snapshot, so we want to ensure files it references are not removed.
metadataLock.writeLock().lock();
try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
cleanupFiles(reason, localSnapshot, getMetadata(readLastCommittedSegmentsInfo()));
} finally {
metadataLock.writeLock().unlock();
}
}

private void cleanupFiles(String reason, MetadataSnapshot localSnapshot, @Nullable MetadataSnapshot additionalSnapshot)
throws IOException {
assert metadataLock.isWriteLockedByCurrentThread();
for (String existingFile : directory.listAll()) {
if (Store.isAutogenerated(existingFile)
|| localSnapshot.contains(existingFile)
|| (additionalSnapshot != null && additionalSnapshot.contains(existingFile))) {
// don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete
// checksum)
continue;
}
try {
directory.deleteFile(reason, existingFile);
} catch (IOException ex) {
if (existingFile.startsWith(IndexFileNames.SEGMENTS) || existingFile.startsWith(CORRUPTED_MARKER_NAME_PREFIX)) {
// TODO do we need to also fail this if we can't delete the pending commit file?
// if one of those files can't be deleted we better fail the cleanup otherwise we might leave an old commit
// point around?
throw new IllegalStateException("Can't delete " + existingFile + " - cleanup failed", ex);
}
logger.debug(() -> new ParameterizedMessage("failed to delete file [{}]", existingFile), ex);
// ignore, we don't really care, will get deleted later on
}
}
}

// pkg private for testing
final void verifyAfterCleanup(MetadataSnapshot sourceMetadata, MetadataSnapshot targetMetadata) {
final RecoveryDiff recoveryDiff = targetMetadata.recoveryDiff(sourceMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@ protected void configure() {
bind(RetentionLeaseSyncer.class).asEagerSingleton();
if (FeatureFlags.isEnabled(FeatureFlags.REPLICATION_TYPE)) {
bind(SegmentReplicationCheckpointPublisher.class).asEagerSingleton();
} else {
bind(SegmentReplicationCheckpointPublisher.class).toInstance(SegmentReplicationCheckpointPublisher.EMPTY);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices;

import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionListener;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.common.util.concurrent.FutureUtils;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardRelocatedException;
import org.opensearch.threadpool.ThreadPool;

import java.util.concurrent.CompletableFuture;

/**
* Execute a Runnable after acquiring the primary's operation permit.
*
* @opensearch.internal
*/
public final class RunUnderPrimaryPermit {

public static void run(
CancellableThreads.Interruptible runnable,
String reason,
IndexShard primary,
CancellableThreads cancellableThreads,
Logger logger
) {
cancellableThreads.execute(() -> {
CompletableFuture<Releasable> permit = new CompletableFuture<>();
final ActionListener<Releasable> onAcquired = new ActionListener<>() {
@Override
public void onResponse(Releasable releasable) {
if (permit.complete(releasable) == false) {
releasable.close();
}
}

@Override
public void onFailure(Exception e) {
permit.completeExceptionally(e);
}
};
primary.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME, reason);
try (Releasable ignored = FutureUtils.get(permit)) {
// check that the IndexShard still has the primary authority. This needs to be checked under operation permit to prevent
// races, as IndexShard will switch its authority only when it holds all operation permits, see IndexShard.relocated()
if (primary.isRelocatedPrimary()) {
throw new IndexShardRelocatedException(primary.shardId());
}
runnable.run();
} finally {
// just in case we got an exception (likely interrupted) while waiting for the get
permit.whenComplete((r, e) -> {
if (r != null) {
r.close();
}
if (e != null) {
logger.trace("suppressing exception on completion (it was already bubbled up or the operation was aborted)", e);
}
});
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.recovery;

import org.opensearch.action.ActionListener;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.index.store.StoreFileMetadata;

/**
* Writes a partial file chunk to the target store.
*
* @opensearch.internal
*/
@FunctionalInterface
public interface FileChunkWriter {

void writeFileChunk(
StoreFileMetadata fileMetadata,
long position,
BytesReference content,
boolean lastChunk,
int totalTranslogOps,
ActionListener<Void> listener
);
}
Loading