Skip to content

Commit

Permalink
Draft changes for Diff manifest upload
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
shiv0408 committed May 14, 2024
1 parent 87625d3 commit a10b062
Show file tree
Hide file tree
Showing 13 changed files with 660 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,31 +77,26 @@ public void testRemoteCleanupTaskUpdated() {
RemoteClusterStateService.class
);

assertEquals(
5,
remoteClusterStateService.getStaleFileDeletionTask().getInterval().getMinutes()
);
assertEquals(5, remoteClusterStateService.getStaleFileDeletionTask().getInterval().getMinutes());
assertTrue(remoteClusterStateService.getStaleFileDeletionTask().isScheduled());

// now disable
client().admin().cluster().prepareUpdateSettings()
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), -1))
.get();

assertEquals(
-1,
remoteClusterStateService.getStaleFileDeletionTask().getInterval().getMillis()
);
assertEquals(-1, remoteClusterStateService.getStaleFileDeletionTask().getInterval().getMillis());
assertFalse(remoteClusterStateService.getStaleFileDeletionTask().isScheduled());

// now set Clean up interval to 1 min
client().admin().cluster().prepareUpdateSettings()
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), "1m"))
.get();
assertEquals(
1,
remoteClusterStateService.getStaleFileDeletionTask().getInterval().getMinutes()
);
assertEquals(1, remoteClusterStateService.getStaleFileDeletionTask().getInterval().getMinutes());
}

public void testRemoteCleanupOnlyAfter10Updates() throws Exception {
Expand All @@ -116,7 +111,9 @@ public void testRemoteCleanupOnlyAfter10Updates() throws Exception {
);

// set cleanup interval to 1 min
client().admin().cluster().prepareUpdateSettings()
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), "1m"))
.get();

Expand All @@ -136,20 +133,29 @@ public void testRemoteCleanupOnlyAfter10Updates() throws Exception {
BlobPath manifestContainerPath = baseMetadataPath.add("manifest");

assertBusy(() -> {
assertEquals(RETAINED_MANIFESTS - 1, repository.blobStore().blobContainer(manifestContainerPath).listBlobsByPrefix("manifest").size());
assertEquals(
RETAINED_MANIFESTS - 1,
repository.blobStore().blobContainer(manifestContainerPath).listBlobsByPrefix("manifest").size()
);
}, 1, TimeUnit.MINUTES);

replicaCount = updateReplicaCountNTimes(8, replicaCount);

// wait for 1 min, to ensure that clean up task ran and didn't clean up stale files because it was less than 10
Thread.sleep(60000);
assertNotEquals(RETAINED_MANIFESTS - 1, repository.blobStore().blobContainer(manifestContainerPath).listBlobsByPrefix("manifest").size());
assertNotEquals(
RETAINED_MANIFESTS - 1,
repository.blobStore().blobContainer(manifestContainerPath).listBlobsByPrefix("manifest").size()
);

// Do 2 more updates, now since the total successful state changes are more than 10, stale files will be cleaned up
replicaCount = updateReplicaCountNTimes(2, replicaCount);

assertBusy(() -> {
assertEquals(RETAINED_MANIFESTS - 1, repository.blobStore().blobContainer(manifestContainerPath).listBlobsByPrefix("manifest").size());
assertEquals(
RETAINED_MANIFESTS - 1,
repository.blobStore().blobContainer(manifestContainerPath).listBlobsByPrefix("manifest").size()
);
}, 1, TimeUnit.MINUTES);

Map<String, IndexMetadata> indexMetadataMap = remoteClusterStateService.getLatestClusterState(
Expand Down Expand Up @@ -299,7 +305,8 @@ private void setReplicaCount(int replicaCount) {
}

private int updateReplicaCountNTimes(int n, int initialCount) {
int newReplicaCount = randomIntBetween(0, 3);;
int newReplicaCount = randomIntBetween(0, 3);
;
for (int i = 0; i < n; i++) {
while (newReplicaCount == initialCount) {
newReplicaCount = randomIntBetween(0, 3);
Expand Down
30 changes: 2 additions & 28 deletions server/src/main/java/org/opensearch/cluster/ClusterState.java
Original file line number Diff line number Diff line change
Expand Up @@ -496,38 +496,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}

if (metrics.contains(Metric.BLOCKS)) {
builder.startObject("blocks");

if (blocks().global().isEmpty() == false) {
builder.startObject("global");
for (ClusterBlock block : blocks().global()) {
block.toXContent(builder, params);
}
builder.endObject();
}

if (blocks().indices().isEmpty() == false) {
builder.startObject("indices");
for (final Map.Entry<String, Set<ClusterBlock>> entry : blocks().indices().entrySet()) {
builder.startObject(entry.getKey());
for (ClusterBlock block : entry.getValue()) {
block.toXContent(builder, params);
}
builder.endObject();
}
builder.endObject();
}

builder.endObject();
blocks().toXContent(builder, params);
}

// nodes
if (metrics.contains(Metric.NODES)) {
builder.startObject("nodes");
for (DiscoveryNode node : nodes) {
node.toXContent(builder, params);
}
builder.endObject();
nodes.toXContent(builder, params);
}

// meta data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,15 @@

import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.repositories.RepositoryOperation;

import java.io.IOException;
Expand Down Expand Up @@ -101,13 +104,46 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.startObject();
{
builder.field("repository", entry.repository);
if (params.param(Metadata.CONTEXT_MODE_PARAM, Metadata.CONTEXT_MODE_API).equals(Metadata.CONTEXT_MODE_GATEWAY)) {
builder.field("repository_state_id", entry.repositoryStateId);
} // else we don't serialize it
}
builder.endObject();
}
builder.endArray();
return builder;
}

public static RepositoryCleanupInProgress fromXContent(XContentParser parser) throws IOException {
if (parser.currentToken() == null) {
parser.nextToken();
}
XContentParserUtils.ensureFieldName(parser, parser.currentToken(), TYPE);
parser.nextToken();
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.currentToken(), parser);
List<Entry> entries = new ArrayList<>();
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
String repository = null;
long repositoryStateId = -1L;
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.currentToken(), parser);
String currentFieldName = parser.currentName();
parser.nextToken();
if ("repository".equals(currentFieldName)) {
repository = parser.text();
} else if ("repository_state_id".equals(currentFieldName)) {
// only XContent parsed with {@link Metadata.CONTEXT_MODE_GATEWAY} will have the repository state id and can be deserialized
repositoryStateId = parser.longValue();
} else {
throw new IllegalArgumentException("unknown field [" + currentFieldName + "]");
}
}
entries.add(new Entry(repository, repositoryStateId));
}
return new RepositoryCleanupInProgress(entries);
}

@Override
public String toString() {
return Strings.toString(MediaTypeRegistry.JSON, this);
Expand Down
Loading

0 comments on commit a10b062

Please sign in to comment.