-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Writable warm replica replication/recovery #17390
base: main
Are you sure you want to change the base?
Writable warm replica replication/recovery #17390
Conversation
❌ Gradle check result for 6fde641: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java
Outdated
Show resolved
Hide resolved
@@ -238,7 +238,8 @@ private IndexMetadata.Builder updateInSyncAllocations( | |||
allocationId = RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID; | |||
} else { | |||
assert (recoverySource instanceof RecoverySource.SnapshotRecoverySource | |||
|| recoverySource instanceof RecoverySource.RemoteStoreRecoverySource) : recoverySource; | |||
|| recoverySource instanceof RecoverySource.RemoteStoreRecoverySource | |||
|| recoverySource instanceof RecoverySource.ExistingStoreRecoverySource) : recoverySource; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this required and what will be the implication ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In search role nodes when Recovery is happening for existing remote indexes. It was assumed that recovery will happen from snapsho restore. See here
We've changed this flow for warm indices where recovery for indices with EXISTING_STORE source will go through through remote store. Without this change the recovery will be treated as EMPTY_STORE.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are weakening the assertion here . For ExistingStoreRecoverySource
, it can only be a tiered index . But here we are not/cannot check for same.
server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java
Show resolved
Hide resolved
@@ -190,8 +190,10 @@ public CacheStats stats() { | |||
public void logCurrentState() { | |||
int i = 0; | |||
for (RefCountedCache<K, V> cache : table) { | |||
logger.trace("SegmentedCache " + i); | |||
((LRUCache<K, V>) cache).logCurrentState(); | |||
if (cache.size() > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we handle size() == 0
case inside logCurrentState
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As this is for logging purpose. Skipping logging the empty cache details.
logger.trace("Composite Directory[{}]: Local Directory files - {}", this::toString, () -> Arrays.toString(localFiles)); | ||
logger.trace("Composite Directory[{}]: Remote Directory files - {}", this::toString, () -> Arrays.toString(remoteFiles)); | ||
// logger.trace("Composite Directory[{}]: Remote Directory files - {}", this::toString, () -> Arrays.toString(remoteFiles)); | ||
Set<String> nonBlockLuceneFiles = allFiles.stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we returning just the nonBlockLuceneFiles
here ? won't the list be incomplete here as some files can get evicted ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remote will have files that are marked as block. Meanwhile we will evaluate the listAll()
implementation for CompositeDirectory()
. As some tests are flaky with this Implementation that i've muted with this PR.
Signed-off-by: Sandeep Kumawat <[email protected]>
719a67d
to
1963b58
Compare
server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java
Outdated
Show resolved
Hide resolved
❌ Gradle check result for 1963b58: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
1963b58
to
3ce8970
Compare
❌ Gradle check result for 3ce8970: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
3ce8970
to
505a59c
Compare
❌ Gradle check result for 505a59c: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
|
if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK)) { | ||
FileChunkRequest req = (FileChunkRequest) request; | ||
logger.debug("file chunk [{}] lastChunk: {}", req, req.lastChunk()); | ||
if (req.name().endsWith("cfs") && req.lastChunk()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where are we doing file copy ?
@@ -0,0 +1,2087 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With Warm
only RemoteStoreSegmentReplication
is supported . So we can rename the file as WarmIndexReplication/Recovery
etc and segregate functions as well .
.../src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java
Outdated
Show resolved
Hide resolved
@@ -238,7 +238,8 @@ private IndexMetadata.Builder updateInSyncAllocations( | |||
allocationId = RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID; | |||
} else { | |||
assert (recoverySource instanceof RecoverySource.SnapshotRecoverySource | |||
|| recoverySource instanceof RecoverySource.RemoteStoreRecoverySource) : recoverySource; | |||
|| recoverySource instanceof RecoverySource.RemoteStoreRecoverySource | |||
|| recoverySource instanceof RecoverySource.ExistingStoreRecoverySource) : recoverySource; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are weakening the assertion here . For ExistingStoreRecoverySource
, it can only be a tiered index . But here we are not/cannot check for same.
server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java
Show resolved
Hide resolved
// For warm indices, it doesn't make sense to check for local existence of files. | ||
overrideLocal = overrideLocal && indexSettings.isStoreLocalityPartial() == false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for remote restore, we always do this . I don't think we should change that behavior as we want to go back in time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This make sense. Even for partial locality index it is safe to override the files. Thanks for this comment
if (indexSettings.isStoreLocalityPartial() == false) { | ||
assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty() | ||
: "There should not be any segments file in the dir"; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a better way to do it
assert ... || indexSettings.isStoreLocalityPartial()
logger.info("The file [{}] exist in local but not part of FileCache, deleting it from local", name); | ||
localDirectory.deleteFile(name); | ||
} else { | ||
fileCache.remove(getFilePath(name)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where are the block files removed ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are using listLocalFiles()
to check if localDirectory contains that file. listLocalFiles()
will return blocked files as well. So when delete for block files is called. We will delete it in this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added logic to delete block files as well.
server/src/main/java/org/opensearch/index/store/remote/utils/cache/LRUCache.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/opensearch/index/store/remote/utils/cache/SegmentedCache.java
Outdated
Show resolved
Hide resolved
Signed-off-by: Sandeep Kumawat <[email protected]>
505a59c
to
8339c0f
Compare
❌ Gradle check result for 8339c0f: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
@@ -18,12 +18,14 @@ | |||
@ExperimentalApi | |||
public class FileTypeUtils { | |||
|
|||
public static String BLOCK_FILE_SUFFIX = "_block_"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: BLOCK_FILE_IDENTIFIER
(since it is not really a suffix)
// TODO: Revisit listAll() implementation, Check if we should include the remote files as well. | ||
@Override | ||
public String[] listAll() throws IOException { | ||
ensureOpen(); | ||
logger.trace("Composite Directory[{}]: listAll() called", this::toString); | ||
String[] localFiles = localDirectory.listAll(); | ||
Set<String> allFiles = new HashSet<>(Arrays.asList(localFiles)); | ||
String[] remoteFiles = getRemoteFiles(); | ||
Set<String> allFiles = new HashSet<>(Arrays.asList(localFiles)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes not required, please remove them
* Returns names of all files stored in local directory | ||
* @throws IOException in case of I/O error | ||
*/ | ||
protected String[] listLocalFiles() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be private
We do not want to expose local or remote related distinctions from this class.
* Returns names of all block files stored in local directory for a given file | ||
* @throws IOException in case of I/O error | ||
*/ | ||
protected List<String> listBlockFiles(String fileName) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be private
as well.
Also since this will include the full file as well if present locally, should we consider renaming this as listLocalFiles as well (without any parameter, it will return all local files, with filename as parameter will return all local files related to that file) ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this will become confusing. I believe we should call it listBlockFiles() only and explain in the docs that it will return all the block files as well as that file itself.
String[] allFiles = listLocalFiles(); | ||
List<String> blockFiles = new ArrayList<>(); | ||
|
||
// Add the original file if it exists | ||
if (Arrays.asList(allFiles).contains(fileName)) { | ||
blockFiles.add(fileName); | ||
} | ||
|
||
// Find and add all block files | ||
String prefix = fileName + FileTypeUtils.BLOCK_FILE_SUFFIX; | ||
for (String file : allFiles) { | ||
if (file.startsWith(prefix)) { | ||
blockFiles.add(file); | ||
} | ||
} | ||
return blockFiles; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit:
protected List<String> listBlockFiles(String fileName) throws IOException {
return Stream.of(listLocalFiles())
.filter(file -> file.equals(fileName) || file.startsWith(fileName + FileTypeUtils.BLOCK_FILE_SUFFIX))
.collect(Collectors.toList());
}
} else if (Arrays.asList(listAll()).contains(name) == false) { | ||
throw new NoSuchFileException("File " + name + " not found in directory"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we removing this ?
If we get delete call for a non-existent file we should return a NoSuchFileException by contract.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am understanding your concern. Let me explain why we are doing this change:
In CompositeDirectory, we are having both local and remote files.
Lucene is using listAll() to get files and then calling deleteFile(). But in our case, a file may be in remote even if not in local. So if we throw NoSuchFileException just because file is not in local, it is not correct.
I am thinking we should do like this:
We will throw NoSuchFileException only when file is not in both local and remote.
If file is in any place (local or remote), we will do deletion process.
This way, we are not saying file is missing when it is actually in remote. But we are still following Lucene's rule of throwing exception for truly missing files.
What you are thinking about this?
public void testlistBlockFiles() throws IOException { | ||
List<String> actualFileNames = compositeDirectory.listBlockFiles(FILE_PRESENT_LOCALLY); | ||
String[] expectedFileNames = new String[] { "_1.cfe", "_1.cfe_block_0", "_1.cfe_block_1", }; | ||
assertArrayEquals(expectedFileNames, actualFileNames.toArray()); | ||
} | ||
|
||
public void testLocalFiles() throws IOException { | ||
String[] actualFileNames = compositeDirectory.listLocalFiles(); | ||
String[] expectedFileNames = new String[] { | ||
"_0.cfe_block_7", | ||
"_0.cfs_block_7", | ||
"_1.cfe", | ||
"_1.cfe_block_0", | ||
"_1.cfe_block_1", | ||
"_2.cfe", | ||
"temp_file.tmp" }; | ||
assertArrayEquals(expectedFileNames, actualFileNames); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Separate tests not needed for these functions, adding more cases in testDeleteFile
should suffice as these will be private functions and will be used in the delete call only.
|
||
// Return an empty list for warm indices, In this case, replica shards don't require downloading files from remote storage | ||
// as they can will sync all files from remote in case failure. | ||
if (indexShard.indexSettings().isStoreLocalityPartial() == true) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit : remove == true
.
Description
This PR aims to implement the Replica Replication and Recovery flow for indices with partial store locality (also known as writable warm indices), which were introduced with the composite directory in PR #12782. It builds upon and addresses comments received on the pending PR #14670.
For writable indices, where primary shards now consist of both localDirectory and remoteDirectory, we've implemented file uploads to remote storage and maintain an LRU fileCache on the node instead of storing all files locally.
We've made several changes to the replication and recovery processes:
Related Issues
Resolves #13647
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.