-
Notifications
You must be signed in to change notification settings - Fork 383
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#5361] improvment(hadoop-catalog): Introduce timeout mechanism to get Hadoop File System. #5406
Changes from 8 commits
58efcf6
77785e9
859d8ac
2217682
42c5dce
563c66a
fc961e5
b08fc9c
ec0ef6e
2164e5c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,8 @@ | |
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
import org.apache.commons.lang3.StringUtils; | ||
import org.apache.gravitino.Catalog; | ||
import org.apache.gravitino.Entity; | ||
|
@@ -71,6 +73,8 @@ | |
import org.apache.hadoop.fs.FileStatus; | ||
import org.apache.hadoop.fs.FileSystem; | ||
import org.apache.hadoop.fs.Path; | ||
import org.awaitility.Awaitility; | ||
import org.awaitility.core.ConditionTimeoutException; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
|
@@ -755,6 +759,34 @@ FileSystem getFileSystem(Path path, Map<String, String> config) throws IOExcepti | |
scheme, path, fileSystemProvidersMap.keySet(), fileSystemProvidersMap.values())); | ||
} | ||
|
||
return provider.getFileSystem(path, config); | ||
int timeoutSeconds = | ||
(int) | ||
propertiesMetadata | ||
.catalogPropertiesMetadata() | ||
.getOrDefault( | ||
config, HadoopCatalogPropertiesMetadata.FILESYSTEM_CONNECTION_TIMEOUT_SECONDS); | ||
try { | ||
AtomicReference<FileSystem> fileSystem = new AtomicReference<>(); | ||
Awaitility.await() | ||
.atMost(timeoutSeconds, TimeUnit.SECONDS) | ||
.until( | ||
() -> { | ||
fileSystem.set(provider.getFileSystem(path, config)); | ||
return true; | ||
}); | ||
return fileSystem.get(); | ||
} catch (ConditionTimeoutException e) { | ||
throw new IOException( | ||
String.format( | ||
"Failed to get FileSystem for path: %s, scheme: %s, provider: %s, config: %s within %s seconds, please check the configuration or increase the " | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This line exceeds the 100 characters length, please break into lines. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fix |
||
+ "file system connection timeout time by setting catalog property: %s", | ||
path, | ||
scheme, | ||
provider, | ||
config, | ||
timeoutSeconds, | ||
HadoopCatalogPropertiesMetadata.FILESYSTEM_CONNECTION_TIMEOUT_SECONDS), | ||
e); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -53,6 +53,10 @@ public class HadoopCatalogPropertiesMetadata extends BaseCatalogPropertiesMetada | |
*/ | ||
public static final String DEFAULT_FS_PROVIDER = "default-filesystem-provider"; | ||
|
||
static final String FILESYSTEM_CONNECTION_TIMEOUT_SECONDS = | ||
"filesystem-connection-timeout-seconds"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The configuration name is too long, can you make it shorter. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about "filesystem-conn-timeout-sec"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I take it. |
||
static final int DEFAULT_GET_FILESYSTEM_TIMEOUT_SECONDS = 6; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why it is the "6" seconds we defined here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is just an experience value. This specific setting depends on the environment at that time. I believe 6s is long enough to get the connection. |
||
|
||
public static final String BUILTIN_LOCAL_FS_PROVIDER = "builtin-local"; | ||
public static final String BUILTIN_HDFS_FS_PROVIDER = "builtin-hdfs"; | ||
|
||
|
@@ -82,6 +86,14 @@ public class HadoopCatalogPropertiesMetadata extends BaseCatalogPropertiesMetada | |
false /* immutable */, | ||
BUILTIN_LOCAL_FS_PROVIDER, // please see LocalFileSystemProvider#name() | ||
false /* hidden */)) | ||
.put( | ||
FILESYSTEM_CONNECTION_TIMEOUT_SECONDS, | ||
PropertyEntry.integerOptionalPropertyEntry( | ||
FILESYSTEM_CONNECTION_TIMEOUT_SECONDS, | ||
"Timeout to wait for to create the Hadoop file system client instance.", | ||
false /* immutable */, | ||
DEFAULT_GET_FILESYSTEM_TIMEOUT_SECONDS, | ||
false /* hidden */)) | ||
// The following two are about authentication. | ||
.putAll(KERBEROS_PROPERTY_ENTRIES) | ||
.putAll(AuthenticationConfig.AUTHENTICATION_PROPERTY_ENTRIES) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why it is so time-consuming to initialize the filesystem client?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the user sets an incorrect endpoint, the client will retry to get the connection for a certain amount of time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you really fix this problem without using another thread to create a FS and polling the status asynchronously?