diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index 3d74feddfa261..5ca2d93831506 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -84,6 +84,7 @@ import org.opensearch.discovery.PeerFinder; import org.opensearch.discovery.SeedHostsProvider; import org.opensearch.discovery.SeedHostsResolver; +import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.monitor.NodeHealthService; import org.opensearch.monitor.StatusInfo; import org.opensearch.node.remotestore.RemoteStoreNodeService; @@ -207,7 +208,8 @@ public Coordinator( ElectionStrategy electionStrategy, NodeHealthService nodeHealthService, PersistedStateRegistry persistedStateRegistry, - RemoteStoreNodeService remoteStoreNodeService + RemoteStoreNodeService remoteStoreNodeService, + RemoteClusterStateService remoteClusterStateService ) { this.settings = settings; this.transportService = transportService; @@ -259,7 +261,8 @@ public Coordinator( transportService, namedWriteableRegistry, this::handlePublishRequest, - this::handleApplyCommit + this::handleApplyCommit, + remoteClusterStateService ); this.leaderChecker = new LeaderChecker(settings, clusterSettings, transportService, this::onLeaderFailure, nodeHealthService); this.followersChecker = new FollowersChecker( diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java index 1fdaeead0d28d..d65d730213ad4 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java @@ -31,6 +31,7 @@ package org.opensearch.cluster.coordination; +import java.util.Optional; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; @@ -47,6 +48,8 @@ import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.transport.TransportResponse; +import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.BytesTransportRequest; import org.opensearch.transport.TransportChannel; @@ -97,16 +100,19 @@ public class PublicationTransportHandler { private final TransportRequestOptions stateRequestOptions = TransportRequestOptions.builder() .withType(TransportRequestOptions.Type.STATE) .build(); + private final RemoteClusterStateService remoteClusterStateService; public PublicationTransportHandler( TransportService transportService, NamedWriteableRegistry namedWriteableRegistry, Function handlePublishRequest, - BiConsumer> handleApplyCommit + BiConsumer> handleApplyCommit, + RemoteClusterStateService remoteClusterStateService ) { this.transportService = transportService; this.namedWriteableRegistry = namedWriteableRegistry; this.handlePublishRequest = handlePublishRequest; + this.remoteClusterStateService = remoteClusterStateService; transportService.registerRequestHandler( PUBLISH_STATE_ACTION_NAME, @@ -211,6 +217,40 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque } } + private PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException { + final Optional manifestOptional = remoteClusterStateService.getClusterMetadataManifestByTermVersion(request.getClusterName(), request.getClusterUUID(), request.term, request.version); + if (manifestOptional.isPresent() == false) { + // todo change exception + throw new IncompatibleClusterStateVersionException("No remote state for term version"); + } + ClusterMetadataManifest manifest = manifestOptional.get(); + boolean applyFullState = false; + final ClusterState lastSeen = lastSeenClusterState.get(); + if (lastSeen == null) { + logger.debug("Diff cannot be applied as there is not last cluster state"); + applyFullState = true; + } else if (manifest.getDiffManifest() == null) { + logger.debug("There is no diff in the manifest"); + applyFullState = true; + } else if (manifest.getDiffManifest().getFromStateUUID().equals(lastSeen.stateUUID()) == false) { + logger.debug("Last cluster state not compatible with the diff"); + applyFullState = true; + } else { + ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff(request.getClusterName(), manifest, lastSeenClusterState.get()); + final PublishWithJoinResponse response = acceptState(clusterState); + lastSeenClusterState.compareAndSet(lastSeen, clusterState); + return response; + } + + if (applyFullState == true) { + ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest(request.getClusterName(), manifest); + logger.debug("Downloaded full cluster state version [{}]", clusterState.version()); + final PublishWithJoinResponse response = acceptState(clusterState); + lastSeenClusterState.set(clusterState); + return response; + } + } + private PublishWithJoinResponse acceptState(ClusterState incomingState) { // if the state is coming from the current node, use original request instead (see currentPublishRequestToSelf for explanation) if (transportService.getLocalNode().equals(incomingState.nodes().getClusterManagerNode())) { diff --git a/server/src/main/java/org/opensearch/cluster/coordination/RemotePublishRequest.java b/server/src/main/java/org/opensearch/cluster/coordination/RemotePublishRequest.java new file mode 100644 index 0000000000000..b1981c0b4b1a0 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/coordination/RemotePublishRequest.java @@ -0,0 +1,48 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.coordination; + +import java.io.IOException; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; + +public class RemotePublishRequest extends TermVersionRequest { + + // todo Do we need cluster name and UUID ? + private final String clusterName; + private final String clusterUUID; + + public RemotePublishRequest(DiscoveryNode sourceNode, long term, long version, String clusterName, String clusterUUID) { + super(sourceNode, term, version); + this.clusterName = clusterName; + this.clusterUUID = clusterUUID; + } + + public RemotePublishRequest(StreamInput in) throws IOException { + super(in); + this.clusterName = in.readString(); + this.clusterUUID = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(clusterName); + out.writeString(clusterUUID); + } + + public String getClusterName() { + return clusterName; + } + + public String getClusterUUID() { + return clusterUUID; + } +} diff --git a/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java b/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java index 288371aa240a0..2edb9a61a85a8 100644 --- a/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java +++ b/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java @@ -52,6 +52,7 @@ import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.gateway.GatewayMetaState; +import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.monitor.NodeHealthService; import org.opensearch.node.remotestore.RemoteStoreNodeService; import org.opensearch.plugins.DiscoveryPlugin; @@ -133,7 +134,8 @@ public DiscoveryModule( RerouteService rerouteService, NodeHealthService nodeHealthService, PersistedStateRegistry persistedStateRegistry, - RemoteStoreNodeService remoteStoreNodeService + RemoteStoreNodeService remoteStoreNodeService, + RemoteClusterStateService remoteClusterStateService ) { final Collection> joinValidators = new ArrayList<>(); final Map> hostProviders = new HashMap<>(); @@ -211,7 +213,8 @@ public DiscoveryModule( electionStrategy, nodeHealthService, persistedStateRegistry, - remoteStoreNodeService + remoteStoreNodeService, + remoteClusterStateService ); } else { throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]"); diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java index dccb11f25e5ef..bd189461620c1 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java @@ -1081,5 +1081,45 @@ public List findUpdatedIndices(Map indices, Map getCustomMetadataUpdated() { + return customMetadataUpdated; + } + + public List getIndicesUpdated() { + return indicesUpdated; + } + + public List getIndicesDeleted() { + return indicesDeleted; + } + + public boolean isClusterBlocksUpdated() { + return clusterBlocksUpdated; + } + + public boolean isDiscoveryNodesUpdated() { + return discoveryNodesUpdated; + } } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index eb21359b0512f..a31e336407e8c 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -13,8 +13,10 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.coordination.CoordinationMetadata; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.TemplatesMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.CheckedRunnable; import org.opensearch.common.Nullable; @@ -29,6 +31,7 @@ import org.opensearch.common.util.concurrent.AbstractAsyncTask; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.action.ActionListener; +import org.opensearch.gateway.remote.ClusterMetadataManifest.ClusterDiffManifest; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; import org.opensearch.index.translog.transfer.BlobStoreTransferService; @@ -664,6 +667,10 @@ public Optional getLatestClusterMetadataManifest(String return remoteManifestManager.getLatestClusterMetadataManifest(clusterName, clusterUUID); } + public Optional getClusterMetadataManifestByTermVersion(String clusterName, String clusterUUID, long term, long version) { + return remoteManifestManager.getClusterMetadataManifestByTermVersion(clusterName, clusterUUID, term, version); + } + @Override public void close() throws IOException { if (staleFileDeletionTask != null) { @@ -728,25 +735,65 @@ public ClusterState getLatestClusterState(String clusterName, String clusterUUID ); } + return getClusterStateForManifest(clusterName, clusterMetadataManifest.get()); + } + + public ClusterState getClusterStateForManifest(String clusterName, ClusterMetadataManifest manifest) { + // todo make this async // Fetch Global Metadata - Metadata globalMetadata = remoteGlobalMetadataManager.getGlobalMetadata(clusterName, clusterUUID, clusterMetadataManifest.get()); + Metadata globalMetadata = remoteGlobalMetadataManager.getGlobalMetadata(clusterName, manifest.getClusterUUID(), manifest); // Fetch Index Metadata Map indices = remoteIndexMetadataManager.getIndexMetadataMap( clusterName, - clusterUUID, - clusterMetadataManifest.get() + manifest.getClusterUUID(), + manifest ); Map indexMetadataMap = new HashMap<>(); indices.values().forEach(indexMetadata -> { indexMetadataMap.put(indexMetadata.getIndex().getName(), indexMetadata); }); return ClusterState.builder(ClusterState.EMPTY_STATE) - .version(clusterMetadataManifest.get().getStateVersion()) + .version(manifest.getStateVersion()) .metadata(Metadata.builder(globalMetadata).indices(indexMetadataMap).build()) .build(); } + public ClusterState getClusterStateUsingDiff(String clusterName, ClusterMetadataManifest manifest, ClusterState previousState) { + assert manifest.getDiffManifest() != null; + ClusterDiffManifest diff = manifest.getDiffManifest(); + ClusterState.Builder clusterStateBuilder = ClusterState.builder(previousState); + Metadata.Builder metadataBuilder = Metadata.builder(previousState.metadata()); + + for (String index:diff.getIndicesUpdated()) { + //todo optimize below iteration + Optional uploadedIndexMetadataOptional = manifest.getIndices().stream().filter(idx -> idx.getIndexName().equals(index)).findFirst(); + assert uploadedIndexMetadataOptional.isPresent() == true; + IndexMetadata indexMetadata = remoteIndexMetadataManager.getIndexMetadata(clusterName, manifest.getClusterUUID(), uploadedIndexMetadataOptional.get()); + metadataBuilder.put(indexMetadata, false); + } + for (String index:diff.getIndicesDeleted()) { + metadataBuilder.remove(index); + } + if (diff.isCoordinationMetadataUpdated()) { + CoordinationMetadata coordinationMetadata = remoteGlobalMetadataManager.getCoordinationMetadata(clusterName, manifest.getClusterUUID(), manifest.getCoordinationMetadata().getUploadedFilename()); + metadataBuilder.coordinationMetadata(coordinationMetadata); + } + if (diff.isSettingsMetadataUpdated()) { + Settings settings = remoteGlobalMetadataManager.getSettingsMetadata(clusterName, manifest.getClusterUUID(), manifest.getSettingsMetadata().getUploadedFilename()); + metadataBuilder.persistentSettings(settings); + } + if (diff.isTemplatesMetadataUpdated()) { + TemplatesMetadata templatesMetadata = remoteGlobalMetadataManager.getTemplatesMetadata(clusterName, manifest.getClusterUUID(), manifest.getTemplatesMetadata().getUploadedFilename()); + metadataBuilder.templates(templatesMetadata); + } + for (String customType : diff.getCustomMetadataUpdated().keySet()) { + Metadata.Custom custom = remoteGlobalMetadataManager.getCustomsMetadata(clusterName, manifest.getClusterUUID(), manifest.getCustomMetadataMap().get(customType).getUploadedFilename(), customType); + metadataBuilder.putCustom(customType, custom); + } + return clusterStateBuilder.metadata(metadataBuilder).build(); + } + /** * Fetch the previous cluster UUIDs from remote state store and return the most recent valid cluster UUID * diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java index 9141085b6d5a2..12fb618152639 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java @@ -179,7 +179,7 @@ Metadata getGlobalMetadata(String clusterName, String clusterUUID, ClusterMetada } } - private CoordinationMetadata getCoordinationMetadata(String clusterName, String clusterUUID, String coordinationMetadataFileName) { + public CoordinationMetadata getCoordinationMetadata(String clusterName, String clusterUUID, String coordinationMetadataFileName) { try { // Fetch Coordination metadata if (coordinationMetadataFileName != null) { @@ -200,7 +200,7 @@ private CoordinationMetadata getCoordinationMetadata(String clusterName, String } } - private Settings getSettingsMetadata(String clusterName, String clusterUUID, String settingsMetadataFileName) { + public Settings getSettingsMetadata(String clusterName, String clusterUUID, String settingsMetadataFileName) { try { // Fetch Settings metadata if (settingsMetadataFileName != null) { @@ -221,7 +221,7 @@ private Settings getSettingsMetadata(String clusterName, String clusterUUID, Str } } - private TemplatesMetadata getTemplatesMetadata(String clusterName, String clusterUUID, String templatesMetadataFileName) { + public TemplatesMetadata getTemplatesMetadata(String clusterName, String clusterUUID, String templatesMetadataFileName) { try { // Fetch Templates metadata if (templatesMetadataFileName != null) { @@ -242,7 +242,7 @@ private TemplatesMetadata getTemplatesMetadata(String clusterName, String cluste } } - private Metadata.Custom getCustomsMetadata(String clusterName, String clusterUUID, String customMetadataFileName, String custom) { + public Metadata.Custom getCustomsMetadata(String clusterName, String clusterUUID, String customMetadataFileName, String custom) { requireNonNull(customMetadataFileName); try { // Fetch Custom metadata diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteManifestManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteManifestManager.java index 3523216c8c83f..6f67d6d3285dd 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteManifestManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteManifestManager.java @@ -227,6 +227,11 @@ public Optional getLatestClusterMetadataManifest(String return latestManifestFileName.map(s -> fetchRemoteClusterMetadataManifest(clusterName, clusterUUID, s)); } + public Optional getClusterMetadataManifestByTermVersion(String clusterName, String clusterUUID, long term, long version) { + Optional manifestFileName = getManifestFileNameByTermVersion(clusterName, clusterUUID, term, version); + return manifestFileName.map(s -> fetchRemoteClusterMetadataManifest(clusterName, clusterUUID, s)); + } + /** * Fetch ClusterMetadataManifest from remote state store * @@ -313,7 +318,7 @@ private void setMetadataManifestUploadTimeout(TimeValue newMetadataManifestUploa * @param limit max no of files to fetch * @return all manifest file names */ - private List getManifestFileNames(String clusterName, String clusterUUID, int limit) throws IllegalStateException { + private List getManifestFileNames(String clusterName, String clusterUUID, String filePrefix, int limit) throws IllegalStateException { try { /* @@ -322,7 +327,7 @@ private List getManifestFileNames(String clusterName, String clust when sorted in LEXICOGRAPHIC order the latest uploaded manifest file comes on top. */ return manifestContainer(clusterName, clusterUUID).listBlobsByPrefixInSortedOrder( - MANIFEST_FILE_PREFIX + DELIMITER, + filePrefix, limit, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC ); @@ -345,6 +350,15 @@ static String getManifestFileName(long term, long version, boolean committed, in ); } + static String getManifestFilePrefixForTermVersion(long term, long version) { + return String.join( + DELIMITER, + MANIFEST_FILE_PREFIX, + RemoteStoreUtils.invertLong(term), + RemoteStoreUtils.invertLong(version) + ) + DELIMITER; + } + /** * Fetch latest ClusterMetadataManifest file from remote state store * @@ -353,11 +367,21 @@ static String getManifestFileName(long term, long version, boolean committed, in * @return latest ClusterMetadataManifest filename */ private Optional getLatestManifestFileName(String clusterName, String clusterUUID) throws IllegalStateException { - List manifestFilesMetadata = getManifestFileNames(clusterName, clusterUUID, 1); + List manifestFilesMetadata = getManifestFileNames(clusterName, clusterUUID, MANIFEST_FILE_PREFIX + DELIMITER, 1); if (manifestFilesMetadata != null && !manifestFilesMetadata.isEmpty()) { return Optional.of(manifestFilesMetadata.get(0).name()); } logger.info("No manifest file present in remote store for cluster name: {}, cluster UUID: {}", clusterName, clusterUUID); return Optional.empty(); } + + private Optional getManifestFileNameByTermVersion(String clusterName, String clusterUUID, long term, long version) throws IllegalStateException { + final String filePrefix = getManifestFilePrefixForTermVersion(term, version); + List manifestFilesMetadata = getManifestFileNames(clusterName, clusterUUID, filePrefix, 1); + if (manifestFilesMetadata != null && !manifestFilesMetadata.isEmpty()) { + return Optional.of(manifestFilesMetadata.get(0).name()); + } + logger.info("No manifest file present in remote store for cluster name: {}, cluster UUID: {}, term: {}, version: {}", clusterName, clusterUUID, term, version); + return Optional.empty(); + } } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 7fa2b6c8ff497..70204a8e46a28 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -1133,7 +1133,8 @@ protected Node( rerouteService, fsHealthService, persistedStateRegistry, - remoteStoreNodeService + remoteStoreNodeService, + remoteClusterStateService ); final SearchPipelineService searchPipelineService = new SearchPipelineService( clusterService, diff --git a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java index d94f3fb304fe2..e1fffb3a49163 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java @@ -270,7 +270,8 @@ protected void onSendRequest( ElectionStrategy.DEFAULT_INSTANCE, nodeHealthService, persistedStateRegistry, - Mockito.mock(RemoteStoreNodeService.class) + Mockito.mock(RemoteStoreNodeService.class), + null ); transportService.start(); transportService.acceptIncomingRequests(); diff --git a/server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java b/server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java index 6d94054afdea2..94b9f8b71c362 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java @@ -76,7 +76,8 @@ public void testDiffSerializationFailure() { transportService, writableRegistry(), pu -> null, - (pu, l) -> {} + (pu, l) -> {}, + null ); transportService.start(); transportService.acceptIncomingRequests(); diff --git a/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java b/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java index b33ebf8333b36..0b951c1927c71 100644 --- a/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java +++ b/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java @@ -128,7 +128,8 @@ private DiscoveryModule newModule(Settings settings, List plugi mock(RerouteService.class), null, new PersistedStateRegistry(), - remoteStoreNodeService + remoteStoreNodeService, + null ); } diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 4326e5fc63961..2eaa19e8cd7d0 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -2551,7 +2551,8 @@ public void start(ClusterState initialState) { ElectionStrategy.DEFAULT_INSTANCE, () -> new StatusInfo(HEALTHY, "healthy-info"), persistedStateRegistry, - remoteStoreNodeService + remoteStoreNodeService, + null ); clusterManagerService.setClusterStatePublisher(coordinator); coordinator.start(); diff --git a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java index 0754cc1793dc8..0cf9858be7632 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -1180,7 +1180,8 @@ protected Optional getDisruptableMockTransport(Transpo getElectionStrategy(), nodeHealthService, persistedStateRegistry, - remoteStoreNodeService + remoteStoreNodeService, + null ); clusterManagerService.setClusterStatePublisher(coordinator); final GatewayService gatewayService = new GatewayService(