Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Implement segment replication event cancellation. #4387

Merged
merged 1 commit into from
Sep 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
### Fixed
- PR reference to checkout code for changelog verifier ([#4296](https://github.com/opensearch-project/OpenSearch/pull/4296))
- Restore using the class ClusterInfoRequest and ClusterInfoRequestBuilder from package 'org.opensearch.action.support.master.info' for subclasses ([#4324](https://github.com/opensearch-project/OpenSearch/pull/4324))
- Fixed cancellation of segment replication events ([#4225](https://github.com/opensearch-project/OpenSearch/pull/4225))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.SegmentReplicationSourceService;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.common.ReplicationState;
Expand Down Expand Up @@ -152,6 +153,7 @@ public IndicesClusterStateService(
final ThreadPool threadPool,
final PeerRecoveryTargetService recoveryTargetService,
final SegmentReplicationTargetService segmentReplicationTargetService,
final SegmentReplicationSourceService segmentReplicationSourceService,
final ShardStateAction shardStateAction,
final NodeMappingRefreshAction nodeMappingRefreshAction,
final RepositoriesService repositoriesService,
Expand All @@ -170,6 +172,7 @@ public IndicesClusterStateService(
threadPool,
checkpointPublisher,
segmentReplicationTargetService,
segmentReplicationSourceService,
recoveryTargetService,
shardStateAction,
nodeMappingRefreshAction,
Expand All @@ -191,6 +194,7 @@ public IndicesClusterStateService(
final ThreadPool threadPool,
final SegmentReplicationCheckpointPublisher checkpointPublisher,
final SegmentReplicationTargetService segmentReplicationTargetService,
final SegmentReplicationSourceService segmentReplicationSourceService,
final PeerRecoveryTargetService recoveryTargetService,
final ShardStateAction shardStateAction,
final NodeMappingRefreshAction nodeMappingRefreshAction,
Expand All @@ -211,6 +215,7 @@ public IndicesClusterStateService(
// if segrep feature flag is not enabled, don't wire the target serivce as an IndexEventListener.
if (FeatureFlags.isEnabled(FeatureFlags.REPLICATION_TYPE)) {
indexEventListeners.add(segmentReplicationTargetService);
indexEventListeners.add(segmentReplicationSourceService);
}
this.builtInIndexListener = Collections.unmodifiableList(indexEventListeners);
this.indicesService = indicesService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
* @opensearch.internal
*/
class OngoingSegmentReplications {

private final RecoverySettings recoverySettings;
private final IndicesService indicesService;
private final Map<ReplicationCheckpoint, CopyState> copyStateMap;
Expand Down Expand Up @@ -161,14 +160,27 @@ synchronized void cancel(IndexShard shard, String reason) {
cancelHandlers(handler -> handler.getCopyState().getShard().shardId().equals(shard.shardId()), reason);
}

/**
* Cancel all Replication events for the given allocation ID, intended to be called when a primary is shutting down.
*
* @param allocationId {@link String} - Allocation ID.
* @param reason {@link String} - Reason for the cancel
*/
synchronized void cancel(String allocationId, String reason) {
final SegmentReplicationSourceHandler handler = allocationIdToHandlers.remove(allocationId);
if (handler != null) {
handler.cancel(reason);
removeCopyState(handler.getCopyState());
}
}

/**
* Cancel any ongoing replications for a given {@link DiscoveryNode}
*
* @param node {@link DiscoveryNode} node for which to cancel replication events.
*/
void cancelReplication(DiscoveryNode node) {
cancelHandlers(handler -> handler.getTargetNode().equals(node), "Node left");

}

/**
Expand Down Expand Up @@ -243,11 +255,7 @@ private void cancelHandlers(Predicate<? super SegmentReplicationSourceHandler> p
.map(SegmentReplicationSourceHandler::getAllocationId)
.collect(Collectors.toList());
for (String allocationId : allocationIds) {
final SegmentReplicationSourceHandler handler = allocationIdToHandlers.remove(allocationId);
if (handler != null) {
handler.cancel(reason);
removeCopyState(handler.getCopyState());
}
cancel(allocationId, reason);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,10 @@ public void getSegmentFiles(
);
transportClient.executeRetryableAction(GET_SEGMENT_FILES, request, responseListener, reader);
}

@Override
public void cancel() {
transportClient.cancel();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.indices.replication;

import org.opensearch.action.ActionListener;
import org.opensearch.common.util.CancellableThreads.ExecutionCancelledException;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
Expand Down Expand Up @@ -47,4 +48,9 @@ void getSegmentFiles(
Store store,
ActionListener<GetSegmentFilesResponse> listener
);

/**
* Cancel any ongoing requests, should resolve any ongoing listeners with onFailure with a {@link ExecutionCancelledException}.
*/
default void cancel() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,16 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene
final Closeable releaseResources = () -> IOUtils.close(resources);
try {
timer.start();
cancellableThreads.setOnCancel((reason, beforeCancelEx) -> {
final RuntimeException e = new CancellableThreads.ExecutionCancelledException(
"replication was canceled reason [" + reason + "]"
);
if (beforeCancelEx != null) {
e.addSuppressed(beforeCancelEx);
}
IOUtils.closeWhileHandlingException(releaseResources, () -> future.onFailure(e));
throw e;
});
final Consumer<Exception> onFailure = e -> {
assert Transports.assertNotTransportThread(SegmentReplicationSourceHandler.this + "[onFailure]");
IOUtils.closeWhileHandlingException(releaseResources, () -> future.onFailure(e));
Expand Down Expand Up @@ -153,6 +163,7 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene
final MultiChunkTransfer<StoreFileMetadata, SegmentFileTransferHandler.FileChunk> transfer = segmentFileTransferHandler
.createTransfer(shard.store(), storeFileMetadata, () -> 0, sendFileStep);
resources.add(transfer);
cancellableThreads.checkForCancel();
transfer.start();

sendFileStep.whenComplete(r -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterStateListener;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.component.AbstractLifecycleComponent;
Expand Down Expand Up @@ -42,7 +43,25 @@
*
* @opensearch.internal
*/
public final class SegmentReplicationSourceService extends AbstractLifecycleComponent implements ClusterStateListener, IndexEventListener {
public class SegmentReplicationSourceService extends AbstractLifecycleComponent implements ClusterStateListener, IndexEventListener {

// Empty Implementation, only required while Segment Replication is under feature flag.
public static final SegmentReplicationSourceService NO_OP = new SegmentReplicationSourceService() {
@Override
public void clusterChanged(ClusterChangedEvent event) {
// NoOp;
}

@Override
public void beforeIndexShardClosed(ShardId shardId, IndexShard indexShard, Settings indexSettings) {
// NoOp;
}

@Override
public void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting oldRouting, ShardRouting newRouting) {
// NoOp;
}
};

private static final Logger logger = LogManager.getLogger(SegmentReplicationSourceService.class);
private final RecoverySettings recoverySettings;
Expand All @@ -62,6 +81,14 @@ public static class Actions {

private final OngoingSegmentReplications ongoingSegmentReplications;

// Used only for empty implementation.
private SegmentReplicationSourceService() {
recoverySettings = null;
ongoingSegmentReplications = null;
transportService = null;
indicesService = null;
}

public SegmentReplicationSourceService(
IndicesService indicesService,
TransportService transportService,
Expand Down Expand Up @@ -163,10 +190,25 @@ protected void doClose() throws IOException {

}

/**
*
* Cancels any replications on this node to a replica shard that is about to be closed.
*/
@Override
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) {
if (indexShard != null) {
ongoingSegmentReplications.cancel(indexShard, "shard is closed");
}
}

/**
* Cancels any replications on this node to a replica that has been promoted as primary.
*/
@Override
public void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting oldRouting, ShardRouting newRouting) {
if (indexShard != null && oldRouting.primary() == false && newRouting.primary()) {
ongoingSegmentReplications.cancel(indexShard.routingEntry().allocationId().getId(), "Relocating primary shard.");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public enum Stage {
GET_CHECKPOINT_INFO((byte) 3),
FILE_DIFF((byte) 4),
GET_FILES((byte) 5),
FINALIZE_REPLICATION((byte) 6);
FINALIZE_REPLICATION((byte) 6),
CANCELLED((byte) 7);

private static final Stage[] STAGES = new Stage[Stage.values().length];

Expand Down Expand Up @@ -118,6 +119,10 @@ protected void validateAndSetStage(Stage expected, Stage next) {
"can't move replication to stage [" + next + "]. current stage: [" + stage + "] (expected [" + expected + "])"
);
}
stopTimersAndSetStage(next);
}

private void stopTimersAndSetStage(Stage next) {
// save the timing data for the current step
stageTimer.stop();
timingData.add(new Tuple<>(stage.name(), stageTimer.time()));
Expand Down Expand Up @@ -155,6 +160,14 @@ public void setStage(Stage stage) {
overallTimer.stop();
timingData.add(new Tuple<>("OVERALL", overallTimer.time()));
break;
case CANCELLED:
if (this.stage == Stage.DONE) {
throw new IllegalStateException("can't move replication to Cancelled state from Done.");
}
stopTimersAndSetStage(Stage.CANCELLED);
overallTimer.stop();
timingData.add(new Tuple<>("OVERALL", overallTimer.time()));
break;
default:
throw new IllegalArgumentException("unknown SegmentReplicationState.Stage [" + stage + "]");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.apache.lucene.store.ByteBuffersDataInput;
import org.apache.lucene.store.ByteBuffersIndexInput;
import org.apache.lucene.store.ChecksumIndexInput;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.StepListener;
Expand Down Expand Up @@ -103,7 +104,15 @@ public String description() {

@Override
public void notifyListener(OpenSearchException e, boolean sendShardFailure) {
listener.onFailure(state(), e, sendShardFailure);
// Cancellations still are passed to our SegmentReplicationListner as failures, if we have failed because of cancellation
// update the stage.
final Throwable cancelledException = ExceptionsHelper.unwrap(e, CancellableThreads.ExecutionCancelledException.class);
if (cancelledException != null) {
state.setStage(SegmentReplicationState.Stage.CANCELLED);
listener.onFailure(state(), (CancellableThreads.ExecutionCancelledException) cancelledException, sendShardFailure);
} else {
listener.onFailure(state(), e, sendShardFailure);
}
}

@Override
Expand Down Expand Up @@ -134,13 +143,22 @@ public void writeFileChunk(
* @param listener {@link ActionListener} listener.
*/
public void startReplication(ActionListener<Void> listener) {
cancellableThreads.setOnCancel((reason, beforeCancelEx) -> {
// This method only executes when cancellation is triggered by this node and caught by a call to checkForCancel,
// SegmentReplicationSource does not share CancellableThreads.
final CancellableThreads.ExecutionCancelledException executionCancelledException =
new CancellableThreads.ExecutionCancelledException("replication was canceled reason [" + reason + "]");
notifyListener(executionCancelledException, false);
throw executionCancelledException;
});
state.setStage(SegmentReplicationState.Stage.REPLICATING);
final StepListener<CheckpointInfoResponse> checkpointInfoListener = new StepListener<>();
final StepListener<GetSegmentFilesResponse> getFilesListener = new StepListener<>();
final StepListener<Void> finalizeListener = new StepListener<>();

logger.trace("[shardId {}] Replica starting replication [id {}]", shardId().getId(), getId());
// Get list of files to copy from this checkpoint.
cancellableThreads.checkForCancel();
state.setStage(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO);
source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener);

Expand All @@ -154,6 +172,7 @@ public void startReplication(ActionListener<Void> listener) {

private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener<GetSegmentFilesResponse> getFilesListener)
throws IOException {
cancellableThreads.checkForCancel();
state.setStage(SegmentReplicationState.Stage.FILE_DIFF);
final Store.MetadataSnapshot snapshot = checkpointInfo.getSnapshot();
Store.MetadataSnapshot localMetadata = getMetadataSnapshot();
Expand Down Expand Up @@ -188,12 +207,14 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener<GetSeg
}
// always send a req even if not fetching files so the primary can clear the copyState for this shard.
state.setStage(SegmentReplicationState.Stage.GET_FILES);
cancellableThreads.checkForCancel();
source.getSegmentFiles(getId(), checkpointInfo.getCheckpoint(), filesToFetch, store, getFilesListener);
}

private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, ActionListener<Void> listener) {
state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION);
ActionListener.completeWith(listener, () -> {
cancellableThreads.checkForCancel();
state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION);
multiFileWriter.renameAllTempFiles();
final Store store = store();
store.incRef();
Expand Down Expand Up @@ -261,4 +282,10 @@ Store.MetadataSnapshot getMetadataSnapshot() throws IOException {
}
return store.getMetadata(indexShard.getSegmentInfosSnapshot().get());
}

@Override
protected void onCancel(String reason) {
cancellableThreads.cancel(reason);
source.cancel();
}
}
Loading