diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index 94a445a703953..e42343a6e8380 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -70,7 +70,6 @@
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
-import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConcurrentWriteOperationDetectedException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.FileSystemOperationUnhandledException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidFileSystemPropertyException;
@@ -762,53 +761,8 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa
final TracingContext tracingContext) throws IOException {
AbfsRestOperation op;
AbfsClient createClient = getClientHandler().getIngressClient();
- try {
- // Trigger a create with overwrite=false first so that eTag fetch can be
- // avoided for cases when no pre-existing file is present (major portion
- // of create file traffic falls into the case of no pre-existing file).
- op = createClient.createPath(relativePath, true, false, permissions,
- isAppendBlob, null, contextEncryptionAdapter, tracingContext);
-
- } catch (AbfsRestOperationException e) {
- if (e.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) {
- // File pre-exists, fetch eTag
- try {
- op = getClient().getPathStatus(relativePath, false, tracingContext, null);
- } catch (AbfsRestOperationException ex) {
- if (ex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
- // Is a parallel access case, as file which was found to be
- // present went missing by this request.
- throw new ConcurrentWriteOperationDetectedException(
- "Parallel access to the create path detected. Failing request "
- + "to honor single writer semantics");
- } else {
- throw ex;
- }
- }
-
- String eTag = extractEtagHeader(op.getResult());
-
- try {
- // overwrite only if eTag matches with the file properties fetched befpre
- op = createClient.createPath(relativePath, true, true, permissions,
- isAppendBlob, eTag, contextEncryptionAdapter, tracingContext);
- } catch (AbfsRestOperationException ex) {
- if (ex.getStatusCode() == HttpURLConnection.HTTP_PRECON_FAILED) {
- // Is a parallel access case, as file with eTag was just queried
- // and precondition failure can happen only when another file with
- // different etag got created.
- throw new ConcurrentWriteOperationDetectedException(
- "Parallel access to the create path detected. Failing request "
- + "to honor single writer semantics");
- } else {
- throw ex;
- }
- }
- } else {
- throw e;
- }
- }
-
+ op = createClient.conditionalCreateOverwriteFile(relativePath, statistics,
+ permissions, isAppendBlob, contextEncryptionAdapter, tracingContext);
return op;
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/ConcurrentWriteOperationDetectedException.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/ConcurrentWriteOperationDetectedException.java
index 79813ddfe6400..ec42f43ffdb30 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/ConcurrentWriteOperationDetectedException.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/ConcurrentWriteOperationDetectedException.java
@@ -20,12 +20,29 @@
/**
* Thrown when a concurrent write operation is detected.
+ * This exception is used to indicate that parallel access to the create path
+ * has been detected, which violates the single writer semantics.
*/
@org.apache.hadoop.classification.InterfaceAudience.Public
@org.apache.hadoop.classification.InterfaceStability.Evolving
public class ConcurrentWriteOperationDetectedException
extends AzureBlobFileSystemException {
+ private static final String ERROR_MESSAGE = "Parallel access to the create path detected. Failing request "
+ + "to honor single writer semantics";
+
+ /**
+ * Constructs a new ConcurrentWriteOperationDetectedException with a default error message.
+ */
+ public ConcurrentWriteOperationDetectedException() {
+ super(ERROR_MESSAGE);
+ }
+
+ /**
+ * Constructs a new ConcurrentWriteOperationDetectedException with the specified error message.
+ *
+ * @param message the detail message.
+ */
public ConcurrentWriteOperationDetectedException(String message) {
super(message);
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
index 89416acc9d5b6..b54ce1a4dac7e 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
@@ -28,6 +28,7 @@
import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection;
import java.net.URL;
+import java.net.URLDecoder;
import java.net.URLEncoder;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
@@ -49,6 +50,7 @@
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
@@ -61,6 +63,7 @@
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConcurrentWriteOperationDetectedException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
@@ -80,6 +83,7 @@
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static java.net.HttpURLConnection.HTTP_OK;
import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
+import static org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_GET_FILE_STATUS;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ACQUIRE_LEASE_ACTION;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.AND_MARK;
@@ -455,80 +459,84 @@ public void createNonRecursivePreCheck(Path parentPath,
/**
* Get Rest Operation for API
* Put Blob.
- * Creates a file or directory(marker file) at specified path.
- * @param path of the directory to be created.
- * @param tracingContext for tracing the service call.
- * @return executed rest operation containing response from server.
- * @throws AzureBlobFileSystemException if rest operation fails.
+ * Creates a file or directory (marker file) at the specified path.
+ *
+ * @param path the path of the directory to be created.
+ * @param isFileCreation whether the path to create is a file.
+ * @param overwrite whether to overwrite if the path already exists.
+ * @param permissions the permissions to set on the path.
+ * @param isAppendBlob whether the path is an append blob.
+ * @param eTag the eTag of the path.
+ * @param contextEncryptionAdapter the context encryption adapter.
+ * @param tracingContext the tracing context.
+ * @return the executed rest operation containing the response from the server.
+ * @throws AzureBlobFileSystemException if the rest operation fails.
*/
@Override
public AbfsRestOperation createPath(final String path,
- final boolean isFile,
+ final boolean isFileCreation,
final boolean overwrite,
final AzureBlobFileSystemStore.Permissions permissions,
final boolean isAppendBlob,
final String eTag,
final ContextEncryptionAdapter contextEncryptionAdapter,
final TracingContext tracingContext) throws AzureBlobFileSystemException {
- return createPath(path, isFile, overwrite, permissions, isAppendBlob, eTag,
- contextEncryptionAdapter, tracingContext, false);
+ AbfsRestOperation op;
+ if (isFileCreation) {
+ // Create a file with the specified parameters
+ op = createFile(path, overwrite, permissions, isAppendBlob, eTag,
+ contextEncryptionAdapter, tracingContext);
+ } else {
+ // Create a directory with the specified parameters
+ op = createDirectory(path, permissions, isAppendBlob, eTag,
+ contextEncryptionAdapter, tracingContext);
+ }
+ return op;
+ }
+
+ /**
+ * Creates a marker at the specified path.
+ *
+ * @param path the path where the marker is to be created.
+ * @param eTag the eTag of the path.
+ * @param contextEncryptionAdapter the context encryption adapter.
+ * @param tracingContext the tracing context for the service call.
+ *
+ * @return the created AbfsRestOperation.
+ *
+ * @throws AzureBlobFileSystemException if an error occurs during the operation.
+ */
+ protected AbfsRestOperation createMarkerAtPath(final String path,
+ final String eTag,
+ final ContextEncryptionAdapter contextEncryptionAdapter,
+ final TracingContext tracingContext) throws AzureBlobFileSystemException {
+ return createPathRestOp(path, false, false, false, eTag,
+ contextEncryptionAdapter, tracingContext);
}
/**
* Get Rest Operation for API
- * Put Blob.
+ * Put Blob.
* Creates a file or directory (marker file) at the specified path.
*
* @param path the path of the directory to be created.
* @param isFile whether the path is a file.
* @param overwrite whether to overwrite if the path already exists.
- * @param permissions the permissions to set on the path.
* @param isAppendBlob whether the path is an append blob.
* @param eTag the eTag of the path.
* @param contextEncryptionAdapter the context encryption adapter.
- * @param tracingContext the tracing context.
- * @param isCreateCalledFromMarkers whether the create is called from markers.
+ * @param tracingContext the tracing context for the service call.
* @return the executed rest operation containing the response from the server.
* @throws AzureBlobFileSystemException if the rest operation fails.
*/
- public AbfsRestOperation createPath(final String path,
+ public AbfsRestOperation createPathRestOp(final String path,
final boolean isFile,
final boolean overwrite,
- final AzureBlobFileSystemStore.Permissions permissions,
final boolean isAppendBlob,
final String eTag,
final ContextEncryptionAdapter contextEncryptionAdapter,
- final TracingContext tracingContext,
- boolean isCreateCalledFromMarkers) throws AzureBlobFileSystemException {
+ final TracingContext tracingContext) throws AzureBlobFileSystemException {
final List requestHeaders = createDefaultHeaders();
- if (!getIsNamespaceEnabled() && !isCreateCalledFromMarkers) {
- AbfsHttpOperation op1Result = null;
- try {
- op1Result = getPathStatus(path, tracingContext,
- null, true).getResult();
- } catch (AbfsRestOperationException ex) {
- if (ex.getStatusCode() == HTTP_NOT_FOUND) {
- LOG.debug("No directory/path found: {}", path);
- } else {
- LOG.debug("Failed to get path status for: {}", path, ex);
- throw ex;
- }
- }
- if (op1Result != null) {
- boolean isDir = checkIsDir(op1Result);
- if (isFile == isDir) {
- throw new AbfsRestOperationException(HTTP_CONFLICT,
- AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(),
- PATH_EXISTS,
- null);
- }
- }
- Path parentPath = new Path(path).getParent();
- if (parentPath != null && !parentPath.isRoot()) {
- createMarkers(parentPath, overwrite, permissions, isAppendBlob, eTag,
- contextEncryptionAdapter, tracingContext);
- }
- }
if (isFile) {
addEncryptionKeyRequestHeaders(path, requestHeaders, true,
contextEncryptionAdapter, tracingContext);
@@ -555,96 +563,97 @@ public AbfsRestOperation createPath(final String path,
final AbfsRestOperation op = getAbfsRestOperation(
AbfsRestOperationType.PutBlob,
HTTP_METHOD_PUT, url, requestHeaders);
- try {
- op.execute(tracingContext);
- } catch (AzureBlobFileSystemException ex) {
- // If we have no HTTP response, throw the original exception.
- if (!op.hasResult()) {
- throw ex;
- }
- if (!isFile && op.getResult().getStatusCode() == HTTP_CONFLICT) {
- // This ensures that we don't throw ex only for existing directory but if a blob exists we throw exception.
- AbfsHttpOperation opResult = null;
- try {
- opResult = this.getPathStatus(path, true, tracingContext, null).getResult();
- } catch (AbfsRestOperationException e) {
- if (opResult != null) {
- LOG.debug("Failed to get path status for: {} during blob type check", path, e);
- throw e;
- }
- }
- if (opResult != null && checkIsDir(opResult)) {
- return op;
- }
- }
- throw ex;
- }
+ op.execute(tracingContext);
return op;
}
/**
- * Creates marker blobs for the parent directories of the specified path.
+ * Conditionally creates or overwrites a file at the specified relative path.
+ * This method ensures that the file is created or overwritten based on the provided parameters.
*
- * @param path The path for which parent directories need to be created.
- * @param overwrite A flag indicating whether existing directories should be overwritten.
- * @param permissions The permissions to be set for the created directories.
- * @param isAppendBlob A flag indicating whether the created blob should be of type APPEND_BLOB.
- * @param eTag The eTag to be matched for conditional requests.
- * @param contextEncryptionAdapter The encryption adapter for context encryption.
- * @param tracingContext The tracing context for the operation.
- * @throws AzureBlobFileSystemException If the creation of any parent directory fails.
- */
- private void createMarkers(final Path path,
- final boolean overwrite,
- final AzureBlobFileSystemStore.Permissions permissions,
- final boolean isAppendBlob,
- final String eTag,
- final ContextEncryptionAdapter contextEncryptionAdapter,
- final TracingContext tracingContext) throws AzureBlobFileSystemException {
- ArrayList keysToCreateAsFolder = new ArrayList<>();
- checkParentChainForFile(path, tracingContext,
- keysToCreateAsFolder);
- for (Path pathToCreate : keysToCreateAsFolder) {
- createPath(pathToCreate.toUri().getPath(), false, overwrite, permissions,
- isAppendBlob, eTag, contextEncryptionAdapter, tracingContext, true);
- }
- }
-
- /**
- * Checks for the entire parent hierarchy and returns if any directory exists and
- * throws an exception if any file exists.
- * @param path path to check the hierarchy for.
- * @param tracingContext the tracingcontext.
+ * @param relativePath The relative path of the file to be created or overwritten.
+ * @param statistics The file system statistics to be updated.
+ * @param permissions The permissions to be set on the file.
+ * @param isAppendBlob Specifies if the file is an append blob.
+ * @param contextEncryptionAdapter The encryption context adapter for handling encryption.
+ * @param tracingContext The tracing context for tracking the operation.
+ * @return An AbfsRestOperation object containing the result of the operation.
+ * @throws IOException If an I/O error occurs during the operation.
*/
- private void checkParentChainForFile(Path path, TracingContext tracingContext,
- List keysToCreateAsFolder) throws AzureBlobFileSystemException {
- AbfsHttpOperation opResult = null;
- Path current = path;
- do {
- try {
- opResult = getPathStatus(current.toUri().getPath(),
- tracingContext, null, false).getResult();
- } catch (AbfsRestOperationException ex) {
- if (ex.getStatusCode() == HTTP_NOT_FOUND) {
- LOG.debug("No explicit directory/path found: {}", current);
- } else {
- LOG.debug("Exception occurred while getting path status: {}", current, ex);
- throw ex;
- }
- }
- boolean isDirectory = opResult != null && checkIsDir(opResult);
- if (opResult != null && !isDirectory) {
+ @Override
+ public AbfsRestOperation conditionalCreateOverwriteFile(String relativePath,
+ FileSystem.Statistics statistics,
+ AzureBlobFileSystemStore.Permissions permissions,
+ boolean isAppendBlob,
+ ContextEncryptionAdapter contextEncryptionAdapter,
+ TracingContext tracingContext) throws IOException {
+ if (!getIsNamespaceEnabled()) {
+ // Check for non-empty directory at the path. The only pending validation is the check for an explicitly empty directory,
+ // which is performed later to optimize TPS by delaying the lookup only if create with overwrite=false fails.
+ if (isNonEmptyDirectory(relativePath, tracingContext)) {
throw new AbfsRestOperationException(HTTP_CONFLICT,
AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(),
PATH_EXISTS,
null);
}
- if (isDirectory) {
- return;
+ // Create markers for the parent hierarchy.
+ tryMarkerCreation(relativePath, isAppendBlob, null,
+ contextEncryptionAdapter, tracingContext);
+ }
+ AbfsRestOperation op;
+ try {
+ // Trigger a creation with overwrite=false first so that eTag fetch can be
+ // avoided for cases when no pre-existing file is present (major portion
+ // of create file traffic falls into the case of no pre-existing file).
+ op = createPathRestOp(relativePath, true, false,
+ isAppendBlob, null, contextEncryptionAdapter, tracingContext);
+ } catch (AbfsRestOperationException e) {
+ if (e.getStatusCode() == HTTP_CONFLICT) {
+ // File pre-exists, fetch eTag
+ try {
+ op = getPathStatus(relativePath, tracingContext, null, false);
+ } catch (AbfsRestOperationException ex) {
+ if (ex.getStatusCode() == HTTP_NOT_FOUND) {
+ // Is a parallel access case, as file which was found to be
+ // present went missing by this request.
+ throw new ConcurrentWriteOperationDetectedException("Parallel access to the create path detected. Failing request "
+ + "as the path which existed before gives not found error");
+ } else {
+ throw ex;
+ }
+ }
+
+ // If present as an explicit empty directory, we should throw conflict exception.
+ boolean isExplicitDir = checkIsDir(op.getResult());
+ if (isExplicitDir) {
+ throw new AbfsRestOperationException(HTTP_CONFLICT,
+ AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(),
+ PATH_EXISTS,
+ null);
+ }
+
+ String eTag = extractEtagHeader(op.getResult());
+
+ try {
+ // overwrite only if eTag matches with the file properties fetched before
+ op = createPathRestOp(relativePath, true, true,
+ isAppendBlob, eTag, contextEncryptionAdapter, tracingContext);
+ } catch (AbfsRestOperationException ex) {
+ if (ex.getStatusCode() == HTTP_PRECON_FAILED) {
+ // Is a parallel access case, as file with eTag was just queried
+ // and precondition failure can happen only when another file with
+ // different etag got created.
+ throw new ConcurrentWriteOperationDetectedException("Parallel access to the create path detected. Failing request "
+ + "due to precondition failure");
+ } else {
+ throw ex;
+ }
+ }
+ } else {
+ throw e;
}
- keysToCreateAsFolder.add(current);
- current = current.getParent();
- } while (current != null && !current.isRoot());
+ }
+ return op;
}
/**
@@ -808,7 +817,7 @@ destination, sourceEtag, isAtomicRenameKey(source), tracingContext
final AbfsRestOperation successOp = getAbfsRestOperation(
AbfsRestOperationType.RenamePath, HTTP_METHOD_PUT,
url, requestHeaders);
- successOp.hardSetResult(HttpURLConnection.HTTP_OK);
+ successOp.hardSetResult(HTTP_OK);
return new AbfsClientRenameResult(successOp, true, false);
} else {
throw new AbfsRestOperationException(HTTP_INTERNAL_ERROR,
@@ -1114,10 +1123,10 @@ public AbfsRestOperation setPathProperties(final String path,
throw ex;
}
// This path could be present as an implicit directory in FNS.
- if (op.getResult().getStatusCode() == HTTP_NOT_FOUND && isNonEmptyListing(path, tracingContext)) {
+ if (op.getResult().getStatusCode() == HTTP_NOT_FOUND && isNonEmptyDirectory(path, tracingContext)) {
// Implicit path found, create a marker blob at this path and set properties.
- this.createPath(path, false, false, null, false, null,
- contextEncryptionAdapter, tracingContext, false);
+ this.createPathRestOp(path, false, false, false, null,
+ contextEncryptionAdapter, tracingContext);
// Make sure hdi_isFolder is added to the list of properties to be set.
boolean hdiIsFolderExists = properties.containsKey(XML_TAG_HDI_ISFOLDER);
if (!hdiIsFolderExists) {
@@ -1197,7 +1206,7 @@ public AbfsRestOperation getPathStatus(final String path,
}
// This path could be present as an implicit directory in FNS.
if (op.getResult().getStatusCode() == HTTP_NOT_FOUND
- && isImplicitCheckRequired && isNonEmptyListing(path, tracingContext)) {
+ && isImplicitCheckRequired && isNonEmptyDirectory(path, tracingContext)) {
// Implicit path found.
AbfsRestOperation successOp = getAbfsRestOperation(
AbfsRestOperationType.GetPathStatus,
@@ -1302,7 +1311,7 @@ public AbfsRestOperation deletePath(final String path,
final AbfsRestOperation successOp = getAbfsRestOperation(
AbfsRestOperationType.DeletePath, HTTP_METHOD_DELETE,
url, requestHeaders);
- successOp.hardSetResult(HttpURLConnection.HTTP_OK);
+ successOp.hardSetResult(HTTP_OK);
return successOp;
} else {
throw new AbfsRestOperationException(HTTP_INTERNAL_ERROR,
@@ -1532,8 +1541,8 @@ public boolean checkIsDir(AbfsHttpOperation result) {
@Override
public boolean checkUserError(int responseStatusCode) {
return (responseStatusCode >= HttpURLConnection.HTTP_BAD_REQUEST
- && responseStatusCode < HttpURLConnection.HTTP_INTERNAL_ERROR
- && responseStatusCode != HttpURLConnection.HTTP_CONFLICT);
+ && responseStatusCode < HTTP_INTERNAL_ERROR
+ && responseStatusCode != HTTP_CONFLICT);
}
/**
@@ -1746,7 +1755,7 @@ public void takeGetPathStatusAtomicRenameKeyAction(final Path path,
return;
}
} catch (AbfsRestOperationException ex) {
- if (ex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
+ if (ex.getStatusCode() == HTTP_NOT_FOUND) {
return;
}
throw ex;
@@ -1756,7 +1765,7 @@ public void takeGetPathStatusAtomicRenameKeyAction(final Path path,
try {
RenameAtomicity renameAtomicity = getRedoRenameAtomicity(
pendingJsonPath, Integer.parseInt(pendingJsonFileStatus.getResult()
- .getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH)),
+ .getResponseHeader(CONTENT_LENGTH)),
tracingContext);
renameAtomicity.redo();
renameSrcHasChanged = false;
@@ -1768,8 +1777,8 @@ public void takeGetPathStatusAtomicRenameKeyAction(final Path path,
* to a HTTP_CONFLICT. In this case, no more operation needs to be taken, and
* the calling getPathStatus can return this source path as result.
*/
- if (ex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND
- || ex.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) {
+ if (ex.getStatusCode() == HTTP_NOT_FOUND
+ || ex.getStatusCode() == HTTP_CONFLICT) {
renameSrcHasChanged = true;
} else {
throw ex;
@@ -1819,8 +1828,8 @@ private boolean takeListPathAtomicRenameKeyAction(final Path path,
* since this is a renamePendingJson file and would be deleted by the redo operation,
* the calling listPath should not return this json path as result.
*/
- if (ex.getStatusCode() != HttpURLConnection.HTTP_NOT_FOUND
- && ex.getStatusCode() != HttpURLConnection.HTTP_CONFLICT) {
+ if (ex.getStatusCode() != HTTP_NOT_FOUND
+ && ex.getStatusCode() != HTTP_CONFLICT) {
throw ex;
}
}
@@ -1943,7 +1952,7 @@ private BlobListResultSchema getListResultSchemaFromPathStatus(String relativePa
entrySchema.setContentLength(Long.parseLong(pathStatus.getResult().getResponseHeader(CONTENT_LENGTH)));
entrySchema.setLastModifiedTime(
pathStatus.getResult().getResponseHeader(LAST_MODIFIED));
- entrySchema.setETag(AzureBlobFileSystemStore.extractEtagHeader(pathStatus.getResult()));
+ entrySchema.setETag(extractEtagHeader(pathStatus.getResult()));
// If listing is done on explicit directory, do not include directory in the listing.
if (!entrySchema.isDirectory()) {
@@ -1961,12 +1970,22 @@ private static String encodeMetadataAttribute(String value)
private static String decodeMetadataAttribute(String encoded)
throws UnsupportedEncodingException {
return encoded == null ? null
- : java.net.URLDecoder.decode(encoded, StandardCharsets.UTF_8.name());
+ : URLDecoder.decode(encoded, StandardCharsets.UTF_8.name());
}
- private boolean isNonEmptyListing(String path,
+ /**
+ * Checks if the listing of the specified path is non-empty.
+ *
+ * @param path The path to be listed.
+ * @param tracingContext The tracing context for tracking the operation.
+ * @return True if the listing is non-empty, False otherwise.
+ * @throws AzureBlobFileSystemException If an error occurs during the listing operation.
+ */
+ @VisibleForTesting
+ public boolean isNonEmptyDirectory(String path,
TracingContext tracingContext) throws AzureBlobFileSystemException {
- AbfsRestOperation listOp = listPath(path, false, 1, null, tracingContext, false);
+ AbfsRestOperation listOp = listPath(path, false, 1, null, tracingContext,
+ false);
return !isEmptyListResults(listOp.getResult());
}
@@ -2006,4 +2025,248 @@ public static String generateBlockListXml(List blockIds) {
stringBuilder.append(String.format(BLOCK_LIST_END_TAG));
return stringBuilder.toString();
}
+
+ /**
+ * Checks if the specified path exists as a directory.
+ *
+ * @param path the path of the directory to check.
+ * @param tracingContext the tracing context for the service call.
+ * @return true if the directory exists, false otherwise.
+ * @throws AzureBlobFileSystemException if the rest operation fails.
+ */
+ private boolean isExistingDirectory(String path,
+ TracingContext tracingContext)
+ throws AzureBlobFileSystemException {
+ // Check if the directory contains any entries by listing its contents.
+ if (isNonEmptyDirectory(path, tracingContext)) {
+ // If the list result schema has any paths, it is a directory.
+ return true;
+ } else {
+ // If the directory does not contain any entries, check if it exists as an empty directory.
+ return isEmptyDirectory(path, tracingContext, true);
+ }
+ }
+
+ /**
+ * Checks the status of the path to determine if it exists and whether it is a file or directory.
+ * Throws an exception if the path exists as a file.
+ *
+ * @param path the path to check
+ * @param tracingContext the tracing context
+ * @return true if the path exists and is a directory, false otherwise
+ * @throws AbfsRestOperationException if the path exists as a file
+ */
+ private boolean isEmptyDirectory(final String path,
+ final TracingContext tracingContext, boolean isDirCheck) throws AzureBlobFileSystemException {
+ // If the call is to create a directory, there are 3 possible cases:
+ // a) a file exists at that path
+ // b) an empty directory exists
+ // c) the path does not exist.
+ AbfsRestOperation getPathStatusOp = null;
+ try {
+ // GetPathStatus call to check if path already exists.
+ getPathStatusOp = getPathStatus(path, tracingContext, null, false);
+ } catch (AbfsRestOperationException ex) {
+ if (ex.getStatusCode() != HTTP_NOT_FOUND) {
+ throw ex;
+ }
+ }
+ if (getPathStatusOp != null) {
+ // If path exists and is a directory, return true.
+ boolean isDirectory = checkIsDir(getPathStatusOp.getResult());
+ if (!isDirectory && isDirCheck) {
+ // This indicates path exists as a file, hence throw conflict.
+ throw new AbfsRestOperationException(HTTP_CONFLICT,
+ AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(),
+ PATH_EXISTS,
+ null);
+ } else {
+ return isDirectory;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Creates a successful AbfsRestOperation for the given path.
+ *
+ * @param path the path for which the operation is created.
+ * @return the created AbfsRestOperation with a hard set result of HTTP_CREATED.
+ * @throws AzureBlobFileSystemException if an error occurs during the operation creation.
+ */
+ private AbfsRestOperation createSuccessResponse(String path) throws AzureBlobFileSystemException {
+ final AbfsRestOperation successOp = getAbfsRestOperation(
+ AbfsRestOperationType.PutBlob,
+ HTTP_METHOD_PUT, createRequestUrl(path, EMPTY_STRING),
+ createDefaultHeaders());
+ successOp.hardSetResult(HttpURLConnection.HTTP_CREATED);
+ return successOp;
+ }
+
+ /**
+ * Creates a directory at the specified path.
+ *
+ * @param path the path of the directory to be created.
+ * @param permissions the permissions to be set for the directory.
+ * @param isAppendBlob whether the directory is an append blob.
+ * @param eTag the eTag of the directory.
+ * @param contextEncryptionAdapter the encryption context adapter.
+ * @param tracingContext the tracing context for the service call.
+ * @return the executed rest operation containing the response from the server.
+ * @throws AzureBlobFileSystemException if the rest operation fails.
+ */
+ private AbfsRestOperation createDirectory(final String path,
+ final AzureBlobFileSystemStore.Permissions permissions,
+ final boolean isAppendBlob,
+ final String eTag,
+ final ContextEncryptionAdapter contextEncryptionAdapter,
+ final TracingContext tracingContext) throws AzureBlobFileSystemException {
+ if (!getIsNamespaceEnabled()) {
+ try {
+ if (isExistingDirectory(path, tracingContext)) {
+ // we return a dummy success response and save TPS if directory already exists.
+ return createSuccessResponse(path);
+ }
+ } catch (AzureBlobFileSystemException ex) {
+ LOG.error("Path exists as file {} : {}", path, ex.getMessage());
+ throw ex;
+ }
+ tryMarkerCreation(path, isAppendBlob, eTag,
+ contextEncryptionAdapter, tracingContext);
+ }
+ return createPathRestOp(path, false, true,
+ isAppendBlob, eTag, contextEncryptionAdapter, tracingContext);
+ }
+
+ /**
+ * Creates a file at the specified path.
+ *
+ * @param path the path of the file to be created.
+ * @param overwrite whether to overwrite if the file already exists.
+ * @param permissions the permissions to set on the file.
+ * @param isAppendBlob whether the file is an append blob.
+ * @param eTag the eTag of the file.
+ * @param contextEncryptionAdapter the context encryption adapter.
+ * @param tracingContext the tracing context for the service call.
+ * @return the executed rest operation containing the response from the server.
+ * @throws AzureBlobFileSystemException if the rest operation fails.
+ */
+ private AbfsRestOperation createFile(final String path,
+ final boolean overwrite,
+ final AzureBlobFileSystemStore.Permissions permissions,
+ final boolean isAppendBlob,
+ final String eTag,
+ final ContextEncryptionAdapter contextEncryptionAdapter,
+ final TracingContext tracingContext) throws AzureBlobFileSystemException {
+ if (!getIsNamespaceEnabled()) {
+ // Check if non-empty directory already exists at that path.
+ if (isNonEmptyDirectory(path, tracingContext)) {
+ throw new AbfsRestOperationException(HTTP_CONFLICT,
+ AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(),
+ PATH_EXISTS,
+ null);
+ }
+ // If the overwrite flag is true, we must verify whether an empty directory exists at the specified path.
+ // However, if overwrite is false, we can skip this validation and proceed with blob creation,
+ // which will fail with a conflict error if a file or directory already exists at the path.
+ if (overwrite && isEmptyDirectory(path, tracingContext, false)) {
+ throw new AbfsRestOperationException(HTTP_CONFLICT,
+ AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(),
+ PATH_EXISTS,
+ null);
+ }
+ tryMarkerCreation(path, isAppendBlob, eTag,
+ contextEncryptionAdapter, tracingContext);
+ }
+ return createPathRestOp(path, true, overwrite,
+ isAppendBlob, eTag, contextEncryptionAdapter, tracingContext);
+ }
+
+ /**
+ * Retrieves the list of marker paths to be created for the specified path.
+ *
+ * @param path The path for which marker paths need to be created.
+ * @param tracingContext The tracing context for the operation.
+ * @return A list of paths that need to be created as markers.
+ * @throws AzureBlobFileSystemException If an error occurs while finding parent paths for marker creation.
+ */
+ @VisibleForTesting
+ public List getMarkerPathsTobeCreated(final Path path,
+ final TracingContext tracingContext) throws AzureBlobFileSystemException {
+ ArrayList keysToCreateAsFolder = new ArrayList<>();
+ findParentPathsForMarkerCreation(path, tracingContext, keysToCreateAsFolder);
+ return keysToCreateAsFolder;
+ }
+
+ /**
+ * Creates marker blobs for the parent directories of the specified path.
+ *
+ * @param path The path for which parent directories need to be created.
+ * @param isAppendBlob A flag indicating whether the created blob should be of type APPEND_BLOB.
+ * @param eTag The eTag to be matched for conditional requests.
+ * @param contextEncryptionAdapter The encryption adapter for context encryption.
+ * @param tracingContext The tracing context for the operation.
+ *
+ * @throws AzureBlobFileSystemException If the creation of any parent directory fails.
+ */
+ @VisibleForTesting
+ public void tryMarkerCreation(String path,
+ boolean isAppendBlob,
+ String eTag,
+ ContextEncryptionAdapter contextEncryptionAdapter,
+ TracingContext tracingContext) throws AzureBlobFileSystemException {
+ Path parentPath = new Path(path).getParent();
+ if (parentPath != null && !parentPath.isRoot()) {
+ List keysToCreateAsFolder = getMarkerPathsTobeCreated(parentPath,
+ tracingContext);
+ for (Path pathToCreate : keysToCreateAsFolder) {
+ try {
+ createPathRestOp(pathToCreate.toUri().getPath(), false, false,
+ isAppendBlob, eTag, contextEncryptionAdapter, tracingContext);
+ } catch (AbfsRestOperationException e) {
+ LOG.debug("Swallow exception for failed marker creation");
+ }
+ }
+ }
+ }
+
+ /**
+ * Checks for the entire parent hierarchy and returns if any directory exists and
+ * throws an exception if any file exists.
+ * @param path path to check the hierarchy for.
+ * @param tracingContext the tracingcontext.
+ */
+ private void findParentPathsForMarkerCreation(Path path, TracingContext tracingContext,
+ List keysToCreateAsFolder) throws AzureBlobFileSystemException {
+ AbfsHttpOperation opResult = null;
+ Path current = path;
+ do {
+ try {
+ opResult = getPathStatus(current.toUri().getPath(),
+ tracingContext, null, false).getResult();
+ } catch (AbfsRestOperationException ex) {
+ if (ex.getStatusCode() == HTTP_NOT_FOUND) {
+ LOG.debug("No explicit directory/path found: {}", current);
+ } else {
+ LOG.debug("Exception occurred while getting path status: {}", current, ex);
+ throw ex;
+ }
+ }
+ if (opResult == null) {
+ keysToCreateAsFolder.add(current);
+ current = current.getParent();
+ continue;
+ }
+ if (checkIsDir(opResult)) {
+ // Explicit directory found, return from here.
+ return;
+ } else {
+ // File found hence throw exception.
+ throw new AbfsRestOperationException(HTTP_CONFLICT,
+ AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(),
+ PATH_EXISTS,
+ null);
+ }
+ } while (current != null && !current.isRoot());
+ }
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index 47ae988419c85..b6aca80768249 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -47,6 +47,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.Permissions;
@@ -552,6 +553,26 @@ public abstract AbfsRestOperation createPath(String path,
ContextEncryptionAdapter contextEncryptionAdapter,
TracingContext tracingContext) throws AzureBlobFileSystemException;
+ /**
+ * Conditionally creates or overwrites a file at the specified relative path.
+ * This method ensures that the file is created or overwritten based on the provided parameters.
+ *
+ * @param relativePath The relative path of the file to be created or overwritten.
+ * @param statistics The file system statistics to be updated.
+ * @param permissions The permissions to be set on the file.
+ * @param isAppendBlob Specifies if the file is an append blob.
+ * @param contextEncryptionAdapter The encryption context adapter for handling encryption.
+ * @param tracingContext The tracing context for tracking the operation.
+ * @return An AbfsRestOperation object containing the result of the operation.
+ * @throws IOException If an I/O error occurs during the operation.
+ */
+ public abstract AbfsRestOperation conditionalCreateOverwriteFile(String relativePath,
+ FileSystem.Statistics statistics,
+ Permissions permissions,
+ boolean isAppendBlob,
+ ContextEncryptionAdapter contextEncryptionAdapter,
+ TracingContext tracingContext) throws IOException;
+
/**
* Performs a pre-check for a createNonRecursivePreCheck operation. Checks if parentPath
* exists or not.
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
index 71b89147017d7..8505b533ce4c4 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
@@ -40,6 +40,7 @@
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
@@ -50,6 +51,7 @@
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConcurrentWriteOperationDetectedException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidFileSystemPropertyException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
@@ -431,6 +433,71 @@ public void createNonRecursivePreCheck(Path parentPath,
}
}
+ /**
+ * Conditionally creates or overwrites a file at the specified relative path.
+ * This method ensures that the file is created or overwritten based on the provided parameters.
+ *
+ * @param relativePath The relative path of the file to be created or overwritten.
+ * @param statistics The file system statistics to be updated.
+ * @param permissions The permissions to be set on the file.
+ * @param isAppendBlob Specifies if the file is an append blob.
+ * @param contextEncryptionAdapter The encryption context adapter for handling encryption.
+ * @param tracingContext The tracing context for tracking the operation.
+ * @return An AbfsRestOperation object containing the result of the operation.
+ * @throws IOException If an I/O error occurs during the operation.
+ */
+ public AbfsRestOperation conditionalCreateOverwriteFile(String relativePath,
+ FileSystem.Statistics statistics,
+ AzureBlobFileSystemStore.Permissions permissions,
+ boolean isAppendBlob,
+ ContextEncryptionAdapter contextEncryptionAdapter,
+ TracingContext tracingContext) throws IOException {
+ AbfsRestOperation op;
+ try {
+ // Trigger a create with overwrite=false first so that eTag fetch can be
+ // avoided for cases when no pre-existing file is present (major portion
+ // of create file traffic falls into the case of no pre-existing file).
+ op = createPath(relativePath, true, false, permissions,
+ isAppendBlob, null, contextEncryptionAdapter, tracingContext);
+
+ } catch (AbfsRestOperationException e) {
+ if (e.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) {
+ // File pre-exists, fetch eTag
+ try {
+ op = getPathStatus(relativePath, false, tracingContext, null);
+ } catch (AbfsRestOperationException ex) {
+ if (ex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
+ // Is a parallel access case, as file which was found to be
+ // present went missing by this request.
+ throw new ConcurrentWriteOperationDetectedException();
+ } else {
+ throw ex;
+ }
+ }
+
+ String eTag = extractEtagHeader(op.getResult());
+
+ try {
+ // overwrite only if eTag matches with the file properties fetched befpre
+ op = createPath(relativePath, true, true, permissions,
+ isAppendBlob, eTag, contextEncryptionAdapter, tracingContext);
+ } catch (AbfsRestOperationException ex) {
+ if (ex.getStatusCode() == HttpURLConnection.HTTP_PRECON_FAILED) {
+ // Is a parallel access case, as file with eTag was just queried
+ // and precondition failure can happen only when another file with
+ // different etag got created.
+ throw new ConcurrentWriteOperationDetectedException();
+ } else {
+ throw ex;
+ }
+ }
+ } else {
+ throw e;
+ }
+ }
+ return op;
+ }
+
/**
* Get Rest Operation for API
*
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobDeleteHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobDeleteHandler.java
index 50bdb96c9e7d6..d93bafb676cb3 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobDeleteHandler.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobDeleteHandler.java
@@ -20,6 +20,9 @@
import java.util.concurrent.atomic.AtomicInteger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
@@ -38,6 +41,9 @@
*/
public class BlobDeleteHandler extends ListActionTaker {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ BlobDeleteHandler.class);
+
private final Path path;
private final boolean recursive;
@@ -151,18 +157,12 @@ private void ensurePathParentExist()
throws AzureBlobFileSystemException {
if (!path.isRoot() && !path.getParent().isRoot()) {
try {
- getAbfsClient().createPath(path.getParent().toUri().getPath(),
- false,
- false,
- null,
- false,
+ getAbfsClient().createMarkerAtPath(path.getParent().toUri().getPath(),
null,
null,
tracingContext);
} catch (AbfsRestOperationException ex) {
- if (ex.getStatusCode() != HTTP_CONFLICT) {
- throw ex;
- }
+ LOG.debug("Marker creation failed for parent path {} ", path.getParent().toUri().getPath());
}
}
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobRenameHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobRenameHandler.java
index 4c27752243f4d..695c0694cf533 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobRenameHandler.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobRenameHandler.java
@@ -127,11 +127,14 @@ public boolean execute() throws AzureBlobFileSystemException {
RenameAtomicity renameAtomicity = null;
if (pathInformation.getIsDirectory()
&& pathInformation.getIsImplicit()) {
- AbfsRestOperation createMarkerOp = getAbfsClient().createPath(
- src.toUri().getPath(),
- false, false, null,
- false, null, null, tracingContext);
- pathInformation.setETag(extractEtagHeader(createMarkerOp.getResult()));
+ try {
+ AbfsRestOperation createMarkerOp = getAbfsClient().createMarkerAtPath(
+ src.toUri().getPath(), null, null, tracingContext);
+ pathInformation.setETag(
+ extractEtagHeader(createMarkerOp.getResult()));
+ } catch (AbfsRestOperationException ex) {
+ LOG.debug("Marker creation failed for src path {} ", src.toUri().getPath());
+ }
}
try {
if (isAtomicRename) {
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
index 7e6789e0a139d..4003da49e5147 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
@@ -21,9 +21,12 @@
import java.io.IOException;
import java.net.URI;
import java.util.Hashtable;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import org.assertj.core.api.Assertions;
import org.junit.After;
@@ -700,4 +703,34 @@ protected void assertPathDns(Path path) {
.describedAs("Path does not contain expected DNS")
.contains(expectedDns);
}
+
+ /**
+ * Checks a list of futures for exceptions.
+ *
+ * This method iterates over a list of futures, waits for each task to complete,
+ * and handles any exceptions thrown by the lambda expressions. If a
+ * RuntimeException is caught, it increments the exceptionCaught counter.
+ * If an unexpected exception is caught, it prints the exception to the standard error.
+ * Finally, it asserts that no RuntimeExceptions were caught.
+ *
+ * @param futures The list of futures to check for exceptions.
+ */
+ protected void checkFuturesForExceptions(List> futures, int exceptionVal) {
+ int exceptionCaught = 0;
+ for (Future> future : futures) {
+ try {
+ future.get(); // wait for the task to complete and handle any exceptions thrown by the lambda expression
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof RuntimeException) {
+ exceptionCaught++;
+ } else {
+ System.err.println("Unexpected exception caught: " + cause);
+ }
+ } catch (InterruptedException e) {
+ // handle interruption
+ }
+ }
+ assertEquals(exceptionCaught, exceptionVal);
+ }
}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java
index 0de2e40ce3d8c..e66afbcaa7492 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java
@@ -96,8 +96,8 @@ public void testAbfsHttpSendStatistics() throws IOException {
// 1 create request = 1 connection made and 1 send request
if (client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs)) {
expectedRequestsSent += (directory);
- // Per directory, we have 2 calls :- GetBlobProperties and PutBlob and 1 ListBlobs call (implicit check) for the path.
- expectedConnectionsMade += ((directory * 2) + 1);
+ // Per directory, we have 2 calls :- 1 PutBlob and 1 ListBlobs call.
+ expectedConnectionsMade += ((directory * 2));
} else {
expectedRequestsSent++;
expectedConnectionsMade++;
@@ -176,12 +176,12 @@ public void testAbfsHttpSendStatistics() throws IOException {
* + getFileStatus to fetch the file ETag
* + create overwrite=true
* = 3 connections and 2 send requests in case of Dfs Client
- * = 7 connections (5 GBP and 2 PutBlob calls) in case of Blob Client
+ * = 1 ListBlob + 2 GPS + 2 PutBlob
*/
if (fs.getAbfsStore().getAbfsConfiguration().isConditionalCreateOverwriteEnabled()) {
if (client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs)) {
expectedRequestsSent += 2;
- expectedConnectionsMade += 7;
+ expectedConnectionsMade += 5;
} else {
expectedConnectionsMade += 3;
expectedRequestsSent += 2;
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java
index dfbeb793cd163..73a0d01105753 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java
@@ -29,7 +29,6 @@
import java.util.List;
import java.util.Random;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -510,36 +509,6 @@ public void testRecreateDirectoryAppendAndFlush() throws IOException {
}
}
- /**
- * Checks a list of futures for exceptions.
- *
- * This method iterates over a list of futures, waits for each task to complete,
- * and handles any exceptions thrown by the lambda expressions. If a
- * RuntimeException is caught, it increments the exceptionCaught counter.
- * If an unexpected exception is caught, it prints the exception to the standard error.
- * Finally, it asserts that no RuntimeExceptions were caught.
- *
- * @param futures The list of futures to check for exceptions.
- */
- private void checkFuturesForExceptions(List> futures, int exceptionVal) {
- int exceptionCaught = 0;
- for (Future> future : futures) {
- try {
- future.get(); // wait for the task to complete and handle any exceptions thrown by the lambda expression
- } catch (ExecutionException e) {
- Throwable cause = e.getCause();
- if (cause instanceof RuntimeException) {
- exceptionCaught++;
- } else {
- System.err.println("Unexpected exception caught: " + cause);
- }
- } catch (InterruptedException e) {
- // handle interruption
- }
- }
- assertEquals(exceptionCaught, exceptionVal);
- }
-
/**
* Verify that parallel write with same offset from different output streams will not throw exception.
**/
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
index 81b86088c561f..b3314894d3f99 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
@@ -21,19 +21,24 @@
import java.io.FileNotFoundException;
import java.io.FilterOutputStream;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
+import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import org.assertj.core.api.Assertions;
-import org.junit.Assume;
import org.junit.Test;
import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
@@ -51,6 +56,7 @@
import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientHandler;
+import org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.fs.azurebfs.services.ITestAbfsClient;
@@ -71,14 +77,21 @@
import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_MKDIR_OVERWRITE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicity.SUFFIX;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -221,9 +234,9 @@ private AzureBlobFileSystem createJsonFile(Path path, Path renameJson) throws IO
final AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem());
assumeBlobServiceType();
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
- Mockito.doReturn(store).when(fs).getAbfsStore();
+ doReturn(store).when(fs).getAbfsStore();
AbfsClient client = Mockito.spy(store.getClient());
- Mockito.doReturn(client).when(store).getClient();
+ doReturn(client).when(store).getClient();
fs.setWorkingDirectory(new Path(ROOT_PATH));
fs.mkdirs(new Path(path, "test3"));
AzureBlobFileSystemStore.VersionedFileStatus fileStatus
@@ -391,27 +404,28 @@ public void testTryWithResources() throws Throwable {
*/
@Test
public void testFilterFSWriteAfterClose() throws Throwable {
- final AzureBlobFileSystem fs = getFileSystem();
- Path testPath = new Path(TEST_FOLDER_PATH, TEST_CHILD_FILE);
- FSDataOutputStream out = fs.create(testPath);
- intercept(IOException.class,
- () -> {
- try (FilterOutputStream fos = new FilterOutputStream(out)) {
- byte[] bytes = new byte[8*ONE_MB];
- fos.write(bytes);
- fos.write(bytes);
- fos.flush();
- out.hsync();
- fs.delete(testPath, false);
- // trigger the first failure
- throw intercept(IOException.class,
- () -> {
- fos.write('b');
- out.hsync();
- return "hsync didn't raise an IOE";
- });
- }
- });
+ try (AzureBlobFileSystem fs = getFileSystem()) {
+ Path testPath = new Path(TEST_FOLDER_PATH, TEST_CHILD_FILE);
+ FSDataOutputStream out = fs.create(testPath);
+ intercept(IOException.class,
+ () -> {
+ try (FilterOutputStream fos = new FilterOutputStream(out)) {
+ byte[] bytes = new byte[8 * ONE_MB];
+ fos.write(bytes);
+ fos.write(bytes);
+ fos.flush();
+ out.hsync();
+ fs.delete(testPath, false);
+ // trigger the first failure
+ throw intercept(IOException.class,
+ () -> {
+ fos.write('b');
+ out.hsync();
+ return "hsync didn't raise an IOE";
+ });
+ }
+ });
+ }
}
/**
@@ -433,103 +447,119 @@ public void testDefaultCreateOverwriteFileTest() throws Throwable {
public void testCreateFileOverwrite(boolean enableConditionalCreateOverwrite)
throws Throwable {
- final AzureBlobFileSystem currentFs = getFileSystem();
- Configuration config = new Configuration(this.getRawConfiguration());
- config.set("fs.azure.enable.conditional.create.overwrite",
- Boolean.toString(enableConditionalCreateOverwrite));
- AzureBlobFileSystemStore store = currentFs.getAbfsStore();
- AbfsClient client = store.getClientHandler().getIngressClient();
-
- final AzureBlobFileSystem fs =
- (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(),
- config);
-
- long totalConnectionMadeBeforeTest = fs.getInstrumentationMap()
- .get(CONNECTIONS_MADE.getStatName());
-
- int createRequestCount = 0;
- final Path nonOverwriteFile = new Path("/NonOverwriteTest_FileName_"
- + UUID.randomUUID().toString());
-
- // Case 1: Not Overwrite - File does not pre-exist
- // create should be successful
- fs.create(nonOverwriteFile, false);
-
- // One request to server to create path should be issued
- // two calls added for -
- // 1. getFileStatus on DFS endpoint : 1
- // getFileStatus on Blob endpoint: 2 (Additional List blob call)
- // 2. actual create call: 1
- createRequestCount += (client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs) ? 3: 1);
-
- assertAbfsStatistics(
- CONNECTIONS_MADE,
- totalConnectionMadeBeforeTest + createRequestCount,
- fs.getInstrumentationMap());
-
- // Case 2: Not Overwrite - File pre-exists
- fs.registerListener(new TracingHeaderValidator(
- fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(),
- fs.getFileSystemId(), FSOperationType.CREATE, false, 0));
- intercept(FileAlreadyExistsException.class,
- () -> fs.create(nonOverwriteFile, false));
- fs.registerListener(null);
-
- // One request to server to create path should be issued
- // Only single tryGetFileStatus should happen
- // 1. getFileStatus on DFS endpoint : 1
- // getFileStatus on Blob endpoint: 1 (No Additional List blob call as file exists)
-
- createRequestCount += (client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs) ? 2: 1);
-
- assertAbfsStatistics(
- CONNECTIONS_MADE,
- totalConnectionMadeBeforeTest + createRequestCount,
- fs.getInstrumentationMap());
-
- final Path overwriteFilePath = new Path("/OverwriteTest_FileName_"
- + UUID.randomUUID().toString());
-
- // Case 3: Overwrite - File does not pre-exist
- // create should be successful
- fs.create(overwriteFilePath, true);
-
- /// One request to server to create path should be issued
- // two calls added for -
- // 1. getFileStatus on DFS endpoint : 1
- // getFileStatus on Blob endpoint: 2 (Additional List blob call for non-existing path)
- // 2. actual create call: 1
- createRequestCount += (client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs) ? 3: 1);
-
- assertAbfsStatistics(
- CONNECTIONS_MADE,
- totalConnectionMadeBeforeTest + createRequestCount,
- fs.getInstrumentationMap());
-
- // Case 4: Overwrite - File pre-exists
- fs.registerListener(new TracingHeaderValidator(
- fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(),
- fs.getFileSystemId(), FSOperationType.CREATE, true, 0));
- fs.create(overwriteFilePath, true);
- fs.registerListener(null);
+ try (AzureBlobFileSystem currentFs = getFileSystem()) {
+ Configuration config = new Configuration(this.getRawConfiguration());
+ config.set("fs.azure.enable.conditional.create.overwrite",
+ Boolean.toString(enableConditionalCreateOverwrite));
+ AzureBlobFileSystemStore store = currentFs.getAbfsStore();
+ AbfsClient client = store.getClientHandler().getIngressClient();
+
+ try (AzureBlobFileSystem fs =
+ (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(),
+ config)) {
+
+ long totalConnectionMadeBeforeTest = fs.getInstrumentationMap()
+ .get(CONNECTIONS_MADE.getStatName());
+
+ int createRequestCount = 0;
+ final Path nonOverwriteFile = new Path("/NonOverwriteTest_FileName_"
+ + UUID.randomUUID().toString());
+
+ // Case 1: Not Overwrite - File does not pre-exist
+ // create should be successful
+ fs.create(nonOverwriteFile, false);
+
+ // One request to server to create path should be issued
+ // two calls added for -
+ // 1. getFileStatus on DFS endpoint : 1
+ // getFileStatus on Blob endpoint: 1 ListBlobcall
+ // 2. actual create call: 1
+ createRequestCount += (
+ client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs)
+ ? 2
+ : 1);
+
+ assertAbfsStatistics(
+ CONNECTIONS_MADE,
+ totalConnectionMadeBeforeTest + createRequestCount,
+ fs.getInstrumentationMap());
+
+ // Case 2: Not Overwrite - File pre-exists
+ fs.registerListener(new TracingHeaderValidator(
+ fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(),
+ fs.getFileSystemId(), FSOperationType.CREATE, false, 0));
+ intercept(FileAlreadyExistsException.class,
+ () -> fs.create(nonOverwriteFile, false));
+ fs.registerListener(null);
+
+ // One request to server to create path should be issued
+ // Only single tryGetFileStatus should happen
+ // 1. getFileStatus on DFS endpoint : 1
+ // getFileStatus on Blob endpoint: 1 (No Additional List blob call as file exists)
+
+ createRequestCount += (
+ client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs)
+ ? 2
+ : 1);
+
+ assertAbfsStatistics(
+ CONNECTIONS_MADE,
+ totalConnectionMadeBeforeTest + createRequestCount,
+ fs.getInstrumentationMap());
+
+ final Path overwriteFilePath = new Path("/OverwriteTest_FileName_"
+ + UUID.randomUUID().toString());
+
+ // Case 3: Overwrite - File does not pre-exist
+ // create should be successful
+ fs.create(overwriteFilePath, true);
+
+ /// One request to server to create path should be issued
+ // two calls added for -
+ // 1. getFileStatus on DFS endpoint : 1
+ // getFileStatus on Blob endpoint: 1 ListBlobCall + 1 GPS
+ // 2. actual create call: 1
+ // 1 extra call when conditional overwrite is not enabled to check for empty directory
+ createRequestCount += (client instanceof AbfsBlobClient
+ && !getIsNamespaceEnabled(fs))
+ ? (enableConditionalCreateOverwrite ? 2 : 3)
+ : 1;
+
+ assertAbfsStatistics(
+ CONNECTIONS_MADE,
+ totalConnectionMadeBeforeTest + createRequestCount,
+ fs.getInstrumentationMap());
+
+ // Case 4: Overwrite - File pre-exists
+ fs.registerListener(new TracingHeaderValidator(
+ fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(),
+ fs.getFileSystemId(), FSOperationType.CREATE, true, 0));
+ fs.create(overwriteFilePath, true);
+ fs.registerListener(null);
+
+ createRequestCount += (
+ client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs)
+ ? 1
+ : 0);
+
+ // Second actual create call will hap
+ if (enableConditionalCreateOverwrite) {
+ // Three requests will be sent to server to create path,
+ // 1. create without overwrite
+ // 2. GetFileStatus to get eTag
+ // 3. create with overwrite
+ createRequestCount += 3;
+ } else {
+ createRequestCount += (client instanceof AbfsBlobClient
+ && !getIsNamespaceEnabled(fs)) ? 2 : 1;
+ }
- createRequestCount += (client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs) ? 1: 0);
-
- // Second actual create call will hap
- if (enableConditionalCreateOverwrite) {
- // Three requests will be sent to server to create path,
- // 1. create without overwrite
- // 2. GetFileStatus to get eTag
- // 3. create with overwrite
- createRequestCount += (client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs) ? 4: 3);
- } else {
- createRequestCount++;
+ assertAbfsStatistics(
+ CONNECTIONS_MADE,
+ totalConnectionMadeBeforeTest + createRequestCount,
+ fs.getInstrumentationMap());
+ }
}
-
- assertAbfsStatistics(
- CONNECTIONS_MADE,
- totalConnectionMadeBeforeTest + createRequestCount,
- fs.getInstrumentationMap());
}
/**
@@ -553,117 +583,272 @@ public void testCreateFileOverwrite(boolean enableConditionalCreateOverwrite)
public void testNegativeScenariosForCreateOverwriteDisabled()
throws Throwable {
- final AzureBlobFileSystem currentFs = getFileSystem();
- Configuration config = new Configuration(this.getRawConfiguration());
- config.set("fs.azure.enable.conditional.create.overwrite",
- Boolean.toString(true));
-
- final AzureBlobFileSystem fs =
- (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(),
- config);
-
- // Get mock AbfsClient with current config
- AbfsClient
- mockClient
- = ITestAbfsClient.getMockAbfsClient(
- fs.getAbfsStore().getClient(),
- fs.getAbfsStore().getAbfsConfiguration());
- AbfsClientHandler clientHandler = Mockito.mock(AbfsClientHandler.class);
- when(clientHandler.getIngressClient()).thenReturn(mockClient);
- when(clientHandler.getClient(Mockito.any())).thenReturn(mockClient);
-
- AzureBlobFileSystemStore abfsStore = fs.getAbfsStore();
-
- ReflectionUtils.setFinalField(AzureBlobFileSystemStore.class, abfsStore, "clientHandler", clientHandler);
- ReflectionUtils.setFinalField(AzureBlobFileSystemStore.class, abfsStore, "client", mockClient);
- boolean isNamespaceEnabled = abfsStore
- .getIsNamespaceEnabled(getTestTracingContext(fs, false));
-
- AbfsRestOperation successOp = mock(
- AbfsRestOperation.class);
- AbfsHttpOperation http200Op = mock(
- AbfsHttpOperation.class);
- when(http200Op.getStatusCode()).thenReturn(HTTP_OK);
- when(successOp.getResult()).thenReturn(http200Op);
-
- AbfsRestOperationException conflictResponseEx
- = getMockAbfsRestOperationException(HTTP_CONFLICT);
- AbfsRestOperationException serverErrorResponseEx
- = getMockAbfsRestOperationException(HTTP_INTERNAL_ERROR);
- AbfsRestOperationException fileNotFoundResponseEx
- = getMockAbfsRestOperationException(HTTP_NOT_FOUND);
- AbfsRestOperationException preConditionResponseEx
- = getMockAbfsRestOperationException(HTTP_PRECON_FAILED);
-
- // mock for overwrite=false
- doThrow(conflictResponseEx) // Scn1: GFS fails with Http404
- .doThrow(conflictResponseEx) // Scn2: GFS fails with Http500
- .doThrow(
- conflictResponseEx) // Scn3: create overwrite=true fails with Http412
- .doThrow(
- conflictResponseEx) // Scn4: create overwrite=true fails with Http500
- .doThrow(
- serverErrorResponseEx) // Scn5: create overwrite=false fails with Http500
- .when(mockClient)
- .createPath(any(String.class), eq(true), eq(false),
- any(AzureBlobFileSystemStore.Permissions.class), any(boolean.class), eq(null), any(),
- any(TracingContext.class));
-
- doThrow(fileNotFoundResponseEx) // Scn1: GFS fails with Http404
- .doThrow(serverErrorResponseEx) // Scn2: GFS fails with Http500
- .doReturn(successOp) // Scn3: create overwrite=true fails with Http412
- .doReturn(successOp) // Scn4: create overwrite=true fails with Http500
- .when(mockClient)
- .getPathStatus(any(String.class), eq(false), any(TracingContext.class), nullable(
- ContextEncryptionAdapter.class));
-
- // mock for overwrite=true
- doThrow(
- preConditionResponseEx) // Scn3: create overwrite=true fails with Http412
- .doThrow(
- serverErrorResponseEx) // Scn4: create overwrite=true fails with Http500
- .when(mockClient)
- .createPath(any(String.class), eq(true), eq(true),
- any(AzureBlobFileSystemStore.Permissions.class), any(boolean.class), eq(null), any(),
- any(TracingContext.class));
-
- // Scn1: GFS fails with Http404
- // Sequence of events expected:
- // 1. create overwrite=false - fail with conflict
- // 2. GFS - fail with File Not found
- // Create will fail with ConcurrentWriteOperationDetectedException
- validateCreateFileException(ConcurrentWriteOperationDetectedException.class,
- abfsStore);
-
- // Scn2: GFS fails with Http500
- // Sequence of events expected:
- // 1. create overwrite=false - fail with conflict
- // 2. GFS - fail with Server error
- // Create will fail with 500
- validateCreateFileException(AbfsRestOperationException.class, abfsStore);
-
- // Scn3: create overwrite=true fails with Http412
- // Sequence of events expected:
- // 1. create overwrite=false - fail with conflict
- // 2. GFS - pass
- // 3. create overwrite=true - fail with Pre-Condition
- // Create will fail with ConcurrentWriteOperationDetectedException
- validateCreateFileException(ConcurrentWriteOperationDetectedException.class,
- abfsStore);
-
- // Scn4: create overwrite=true fails with Http500
- // Sequence of events expected:
- // 1. create overwrite=false - fail with conflict
- // 2. GFS - pass
- // 3. create overwrite=true - fail with Server error
- // Create will fail with 500
- validateCreateFileException(AbfsRestOperationException.class, abfsStore);
-
- // Scn5: create overwrite=false fails with Http500
- // Sequence of events expected:
- // 1. create overwrite=false - fail with server error
- // Create will fail with 500
- validateCreateFileException(AbfsRestOperationException.class, abfsStore);
+ try (AzureBlobFileSystem currentFs = getFileSystem()) {
+ Configuration config = new Configuration(this.getRawConfiguration());
+ config.set("fs.azure.enable.conditional.create.overwrite",
+ Boolean.toString(true));
+
+ try (AzureBlobFileSystem fs =
+ (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(),
+ config)) {
+
+ // Get mock AbfsClient with current config
+ AbfsClient
+ mockClient
+ = ITestAbfsClient.getMockAbfsClient(
+ fs.getAbfsStore().getClient(),
+ fs.getAbfsStore().getAbfsConfiguration());
+ AbfsClientHandler clientHandler = Mockito.mock(AbfsClientHandler.class);
+ when(clientHandler.getIngressClient()).thenReturn(mockClient);
+ when(clientHandler.getClient(Mockito.any())).thenReturn(mockClient);
+
+ AzureBlobFileSystemStore abfsStore = fs.getAbfsStore();
+
+ ReflectionUtils.setFinalField(AzureBlobFileSystemStore.class, abfsStore,
+ "clientHandler", clientHandler);
+ ReflectionUtils.setFinalField(AzureBlobFileSystemStore.class, abfsStore,
+ "client", mockClient);
+
+ AbfsRestOperation successOp = mock(
+ AbfsRestOperation.class);
+ AbfsHttpOperation http200Op = mock(
+ AbfsHttpOperation.class);
+ when(http200Op.getStatusCode()).thenReturn(HTTP_OK);
+ when(successOp.getResult()).thenReturn(http200Op);
+
+ AbfsRestOperationException conflictResponseEx
+ = getMockAbfsRestOperationException(HTTP_CONFLICT);
+ AbfsRestOperationException serverErrorResponseEx
+ = getMockAbfsRestOperationException(HTTP_INTERNAL_ERROR);
+ AbfsRestOperationException fileNotFoundResponseEx
+ = getMockAbfsRestOperationException(HTTP_NOT_FOUND);
+ AbfsRestOperationException preConditionResponseEx
+ = getMockAbfsRestOperationException(HTTP_PRECON_FAILED);
+
+ doCallRealMethod().when(mockClient)
+ .conditionalCreateOverwriteFile(anyString(),
+ Mockito.nullable(FileSystem.Statistics.class),
+ Mockito.nullable(AzureBlobFileSystemStore.Permissions.class),
+ anyBoolean(),
+ Mockito.nullable(ContextEncryptionAdapter.class),
+ Mockito.nullable(TracingContext.class));
+
+ // mock for overwrite=false
+ doThrow(conflictResponseEx) // Scn1: GFS fails with Http404
+ .doThrow(conflictResponseEx) // Scn2: GFS fails with Http500
+ .doThrow(
+ conflictResponseEx) // Scn3: create overwrite=true fails with Http412
+ .doThrow(
+ conflictResponseEx) // Scn4: create overwrite=true fails with Http500
+ .doThrow(
+ serverErrorResponseEx) // Scn5: create overwrite=false fails with Http500
+ .when(mockClient)
+ .createPath(any(String.class), eq(true), eq(false),
+ any(AzureBlobFileSystemStore.Permissions.class),
+ any(boolean.class), eq(null), any(),
+ any(TracingContext.class));
+
+ doThrow(fileNotFoundResponseEx) // Scn1: GFS fails with Http404
+ .doThrow(serverErrorResponseEx) // Scn2: GFS fails with Http500
+ .doReturn(
+ successOp) // Scn3: create overwrite=true fails with Http412
+ .doReturn(
+ successOp) // Scn4: create overwrite=true fails with Http500
+ .when(mockClient)
+ .getPathStatus(any(String.class), eq(false),
+ any(TracingContext.class), nullable(
+ ContextEncryptionAdapter.class));
+
+ // mock for overwrite=true
+ doThrow(
+ preConditionResponseEx) // Scn3: create overwrite=true fails with Http412
+ .doThrow(
+ serverErrorResponseEx) // Scn4: create overwrite=true fails with Http500
+ .when(mockClient)
+ .createPath(any(String.class), eq(true), eq(true),
+ any(AzureBlobFileSystemStore.Permissions.class),
+ any(boolean.class), eq(null), any(),
+ any(TracingContext.class));
+
+ if (mockClient instanceof AbfsBlobClient) {
+ doReturn(false).when((AbfsBlobClient) mockClient)
+ .isNonEmptyDirectory(anyString(),
+ Mockito.nullable(TracingContext.class));
+
+ doNothing().when((AbfsBlobClient) mockClient)
+ .tryMarkerCreation(anyString(),
+ anyBoolean(),
+ Mockito.nullable(String.class),
+ Mockito.nullable(ContextEncryptionAdapter.class),
+ Mockito.nullable(TracingContext.class));
+
+ // mock for overwrite=true
+ doThrow(
+ preConditionResponseEx) // Scn3: create overwrite=true fails with Http412
+ .doThrow(
+ serverErrorResponseEx) // Scn4: create overwrite=true fails with Http500
+ .when((AbfsBlobClient) mockClient)
+ .createPathRestOp(any(String.class), eq(true), eq(true),
+ any(boolean.class), eq(null), any(),
+ any(TracingContext.class));
+
+ // mock for overwrite=false
+ doThrow(conflictResponseEx) // Scn1: GFS fails with Http404
+ .doThrow(conflictResponseEx) // Scn2: GFS fails with Http500
+ .doThrow(
+ conflictResponseEx) // Scn3: create overwrite=true fails with Http412
+ .doThrow(
+ conflictResponseEx) // Scn4: create overwrite=true fails with Http500
+ .doThrow(
+ serverErrorResponseEx)
+ // Scn5: create overwrite=false fails with Http500
+ .when((AbfsBlobClient) mockClient)
+ .createPathRestOp(any(String.class), eq(true), eq(false),
+ any(boolean.class), eq(null), any(),
+ any(TracingContext.class));
+
+ doThrow(fileNotFoundResponseEx) // Scn1: GFS fails with Http404
+ .doThrow(serverErrorResponseEx) // Scn2: GFS fails with Http500
+ .doReturn(
+ successOp) // Scn3: create overwrite=true fails with Http412
+ .doReturn(
+ successOp) // Scn4: create overwrite=true fails with Http500
+ .when((AbfsBlobClient) mockClient)
+ .getPathStatus(any(String.class), any(TracingContext.class),
+ nullable(
+ ContextEncryptionAdapter.class), eq(false));
+ }
+
+ // Scn1: GFS fails with Http404
+ // Sequence of events expected:
+ // 1. create overwrite=false - fail with conflict
+ // 2. GFS - fail with File Not found
+ // Create will fail with ConcurrentWriteOperationDetectedException
+ validateCreateFileException(
+ ConcurrentWriteOperationDetectedException.class,
+ abfsStore);
+
+ // Scn2: GFS fails with Http500
+ // Sequence of events expected:
+ // 1. create overwrite=false - fail with conflict
+ // 2. GFS - fail with Server error
+ // Create will fail with 500
+ validateCreateFileException(AbfsRestOperationException.class,
+ abfsStore);
+
+ // Scn3: create overwrite=true fails with Http412
+ // Sequence of events expected:
+ // 1. create overwrite=false - fail with conflict
+ // 2. GFS - pass
+ // 3. create overwrite=true - fail with Pre-Condition
+ // Create will fail with ConcurrentWriteOperationDetectedException
+ validateCreateFileException(
+ ConcurrentWriteOperationDetectedException.class,
+ abfsStore);
+
+ // Scn4: create overwrite=true fails with Http500
+ // Sequence of events expected:
+ // 1. create overwrite=false - fail with conflict
+ // 2. GFS - pass
+ // 3. create overwrite=true - fail with Server error
+ // Create will fail with 500
+ validateCreateFileException(AbfsRestOperationException.class,
+ abfsStore);
+
+ // Scn5: create overwrite=false fails with Http500
+ // Sequence of events expected:
+ // 1. create overwrite=false - fail with server error
+ // Create will fail with 500
+ validateCreateFileException(AbfsRestOperationException.class,
+ abfsStore);
+ }
+ }
+ }
+
+ @Test
+ public void testCreateMarkerFailExceptionIsSwallowed() throws Throwable {
+ assumeBlobServiceType();
+ try (AzureBlobFileSystem currentFs = getFileSystem()) {
+ Configuration config = new Configuration(this.getRawConfiguration());
+ config.set("fs.azure.enable.conditional.create.overwrite", Boolean.toString(true));
+
+ try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(), config)) {
+ AbfsClient mockClient = Mockito.spy(fs.getAbfsClient());
+ AzureBlobFileSystemStore spiedStore = Mockito.spy(fs.getAbfsStore());
+ spiedStore.setClient(mockClient);
+
+ AbfsClientHandler clientHandler = Mockito.mock(AbfsClientHandler.class);
+ when(clientHandler.getIngressClient()).thenReturn(mockClient);
+ when(clientHandler.getClient(Mockito.any())).thenReturn(mockClient);
+ Path testFolder = new Path("/dir1");
+ createAzCopyFolder(testFolder);
+
+ AzureBlobFileSystemStore abfsStore = fs.getAbfsStore();
+ ReflectionUtils.setFinalField(AzureBlobFileSystemStore.class, abfsStore, "clientHandler", clientHandler);
+ ReflectionUtils.setFinalField(AzureBlobFileSystemStore.class, abfsStore, "client", mockClient);
+
+ AbfsRestOperation successOp = mock(AbfsRestOperation.class);
+ AbfsHttpOperation http200Op = mock(AbfsHttpOperation.class);
+ when(http200Op.getStatusCode()).thenReturn(HTTP_OK);
+ when(successOp.getResult()).thenReturn(http200Op);
+
+ AbfsRestOperationException preConditionResponseEx = getMockAbfsRestOperationException(HTTP_PRECON_FAILED);
+
+ doCallRealMethod().when(mockClient)
+ .conditionalCreateOverwriteFile(anyString(),
+ Mockito.nullable(FileSystem.Statistics.class),
+ Mockito.nullable(AzureBlobFileSystemStore.Permissions.class),
+ anyBoolean(),
+ Mockito.nullable(ContextEncryptionAdapter.class),
+ Mockito.nullable(TracingContext.class));
+
+ doCallRealMethod().when((AbfsBlobClient) mockClient)
+ .tryMarkerCreation(anyString(), anyBoolean(), Mockito.nullable(String.class),
+ Mockito.nullable(ContextEncryptionAdapter.class),
+ Mockito.nullable(TracingContext.class));
+
+ Mockito.doReturn(new ArrayList<>(Collections.singletonList(testFolder)))
+ .when((AbfsBlobClient) mockClient)
+ .getMarkerPathsTobeCreated(any(Path.class), Mockito.nullable(TracingContext.class));
+
+ doReturn(false).when((AbfsBlobClient) mockClient)
+ .isNonEmptyDirectory(anyString(), Mockito.nullable(TracingContext.class));
+
+ doAnswer(new Answer() {
+ private boolean firstCall = true;
+
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ if (firstCall) {
+ firstCall = false;
+ throw preConditionResponseEx;
+ }
+ return null;
+ }
+ }).doCallRealMethod()
+ .when((AbfsBlobClient) mockClient)
+ .createPathRestOp(anyString(), anyBoolean(), anyBoolean(),
+ anyBoolean(), Mockito.nullable(String.class),
+ Mockito.nullable(ContextEncryptionAdapter.class),
+ Mockito.nullable(TracingContext.class));
+
+ AbfsClientTestUtil.hookOnRestOpsForTracingContextSingularity(mockClient);
+
+ doReturn(successOp)
+ .when((AbfsBlobClient) mockClient)
+ .getPathStatus(any(String.class), any(TracingContext.class),
+ nullable(ContextEncryptionAdapter.class), eq(false));
+
+ FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
+ FsPermission umask = new FsPermission(FsAction.NONE, FsAction.NONE, FsAction.NONE);
+ Path testPath = new Path("/dir1/testFile");
+ abfsStore.createFile(testPath, null, true, permission, umask,
+ getTestTracingContext(getFileSystem(), true));
+ Assertions.assertThat(fs.exists(testPath))
+ .describedAs("File not created when marker creation failed.")
+ .isTrue();
+ }
+ }
}
private void validateCreateFileException(final Class exceptionClass, final AzureBlobFileSystemStore abfsStore)
@@ -683,19 +868,19 @@ private AbfsRestOperationException getMockAbfsRestOperationException(int status)
return new AbfsRestOperationException(status, "", "", new Exception());
}
-
/**
* Attempts to test multiple flush calls.
*/
@Test
public void testMultipleFlush() throws Throwable {
- final AzureBlobFileSystem fs = getFileSystem();
- Path testPath = new Path(TEST_FOLDER_PATH, TEST_CHILD_FILE);
- try (FSDataOutputStream out = fs.create(testPath)) {
- out.write('1');
- out.hsync();
- out.write('2');
- out.hsync();
+ try (AzureBlobFileSystem fs = getFileSystem()) {
+ Path testPath = new Path(TEST_FOLDER_PATH, TEST_CHILD_FILE);
+ try (FSDataOutputStream out = fs.create(testPath)) {
+ out.write('1');
+ out.hsync();
+ out.write('2');
+ out.hsync();
+ }
}
}
@@ -704,26 +889,29 @@ public void testMultipleFlush() throws Throwable {
*/
@Test
public void testDeleteBeforeFlush() throws Throwable {
- final AzureBlobFileSystem fs = getFileSystem();
- Path testPath = new Path(TEST_FOLDER_PATH, TEST_CHILD_FILE);
- try (FSDataOutputStream out = fs.create(testPath)) {
- out.write('1');
- fs.delete(testPath, false);
- out.hsync();
- // this will cause the next write to failAll
- } catch (IOException fnfe) {
- //appendblob outputStream does not generate suppressed exception on close as it is
- //single threaded code
- if (!fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(testPath).toString())) {
- // the exception raised in close() must be in the caught exception's
- // suppressed list
- Throwable[] suppressed = fnfe.getSuppressed();
- assertEquals("suppressed count", 1, suppressed.length);
- Throwable inner = suppressed[0];
- if (!(inner instanceof IOException)) {
- throw inner;
+ try (AzureBlobFileSystem fs = getFileSystem()) {
+ Path testPath = new Path(TEST_FOLDER_PATH, TEST_CHILD_FILE);
+ try (FSDataOutputStream out = fs.create(testPath)) {
+ out.write('1');
+ fs.delete(testPath, false);
+ out.hsync();
+ // this will cause the next write to failAll
+ } catch (IOException fnfe) {
+ //appendblob outputStream does not generate suppressed exception on close as it is
+ //single threaded code
+ if (!fs.getAbfsStore()
+ .isAppendBlobKey(fs.makeQualified(testPath).toString())) {
+ // the exception raised in close() must be in the caught exception's
+ // suppressed list
+ Throwable[] suppressed = fnfe.getSuppressed();
+ assertEquals("suppressed count", 1, suppressed.length);
+ Throwable inner = suppressed[0];
+ if (!(inner instanceof IOException)) {
+ throw inner;
+ }
+ GenericTestUtils.assertExceptionContains(fnfe.getMessage(),
+ inner.getCause(), inner.getCause().getMessage());
}
- GenericTestUtils.assertExceptionContains(fnfe.getMessage(), inner.getCause(), inner.getCause().getMessage());
}
}
}
@@ -734,19 +922,32 @@ public void testDeleteBeforeFlush() throws Throwable {
*/
@Test
public void testMkdirsFailsForSubdirectoryOfExistingFile() throws Exception {
- final AzureBlobFileSystem fs = getFileSystem();
- fs.create(new Path("a/b/c"));
- fs.mkdirs(new Path("a/b/d"));
- intercept(IOException.class, () -> fs.mkdirs(new Path("a/b/c/d/e")));
-
- Assertions.assertThat(fs.exists(new Path("a/b/c"))).isTrue();
- Assertions.assertThat(fs.exists(new Path("a/b/d"))).isTrue();
- // Asserting directory created still exists as explicit.
- Assertions.assertThat(
- DirectoryStateHelper.isExplicitDirectory(new Path("a/b/d"), fs,
- getTestTracingContext(fs, true)))
- .describedAs("Path is not an explicit directory")
- .isTrue();
+ try (AzureBlobFileSystem fs = getFileSystem()) {
+ fs.create(new Path("a/b/c"));
+ fs.mkdirs(new Path("a/b/d"));
+ intercept(IOException.class, () -> fs.mkdirs(new Path("a/b/c/d/e")));
+
+ Assertions.assertThat(fs.exists(new Path("a/b/c"))).isTrue();
+ Assertions.assertThat(fs.exists(new Path("a/b/d"))).isTrue();
+ // Asserting directory created still exists as explicit.
+ Assertions.assertThat(
+ DirectoryStateHelper.isExplicitDirectory(new Path("a/b/d"), fs,
+ getTestTracingContext(fs, true)))
+ .describedAs("Path is not an explicit directory")
+ .isTrue();
+ }
+ }
+
+ /**
+ * Calling mkdir for existing implicit directory.
+ * @throws Exception
+ */
+ @Test
+ public void testMkdirSameFolder() throws Exception {
+ try (AzureBlobFileSystem fs = getFileSystem()) {
+ createAzCopyFolder(new Path("a/b/d"));
+ fs.mkdirs(new Path("a/b/d"));
+ }
}
/**
@@ -755,16 +956,17 @@ public void testMkdirsFailsForSubdirectoryOfExistingFile() throws Exception {
*/
@Test
public void testCreateDirectoryAndFile() throws Exception {
- final AzureBlobFileSystem fs = getFileSystem();
- fs.mkdirs(new Path("a/b/c"));
- Assertions.assertThat(fs.exists(new Path("a/b/c"))).isTrue();
- intercept(IOException.class, () -> fs.create(new Path("a/b/c")));
- // Asserting that directory still exists as explicit
- Assertions.assertThat(
- DirectoryStateHelper.isExplicitDirectory(new Path("a/b/c"),
- fs, getTestTracingContext(fs, true)))
- .describedAs("Path is not an explicit directory")
- .isTrue();
+ try (AzureBlobFileSystem fs = getFileSystem()) {
+ fs.mkdirs(new Path("a/b/c"));
+ Assertions.assertThat(fs.exists(new Path("a/b/c"))).isTrue();
+ intercept(IOException.class, () -> fs.create(new Path("a/b/c")));
+ // Asserting that directory still exists as explicit
+ Assertions.assertThat(
+ DirectoryStateHelper.isExplicitDirectory(new Path("a/b/c"),
+ fs, getTestTracingContext(fs, true)))
+ .describedAs("Path is not an explicit directory")
+ .isTrue();
+ }
}
/**
@@ -773,12 +975,63 @@ fs, getTestTracingContext(fs, true)))
*/
@Test
public void testCreateSameFile() throws Exception {
- final AzureBlobFileSystem fs = getFileSystem();
- fs.create(new Path("a/b/c"));
- fs.create(new Path("a/b/c"));
- Assertions.assertThat(fs.exists(new Path("a/b/c")))
- .describedAs("Path does not exist")
- .isTrue();
+ try (AzureBlobFileSystem fs = getFileSystem()) {
+ fs.create(new Path("a/b/c"));
+ fs.create(new Path("a/b/c"));
+ Assertions.assertThat(fs.exists(new Path("a/b/c")))
+ .describedAs("Path does not exist")
+ .isTrue();
+ }
+ }
+
+ /**
+ * Test the creation of a file without conditional overwrite.
+ * This test sets the configuration `fs.azure.enable.conditional.create.overwrite` to false,
+ * creates a directory, and then attempts to create a file at the same path with overwrite set to true.
+ * It expects an IOException to be thrown.
+ *
+ * @throws Exception if any exception occurs during the test execution
+ */
+ @Test
+ public void testCreationWithoutConditionalOverwrite()
+ throws Exception {
+ try (AzureBlobFileSystem currentFs = getFileSystem()) {
+ Configuration config = new Configuration(this.getRawConfiguration());
+ config.set("fs.azure.enable.conditional.create.overwrite",
+ String.valueOf(false));
+
+ try (AzureBlobFileSystem fs =
+ (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(),
+ config)) {
+ fs.mkdirs(new Path("a/b/c"));
+ intercept(IOException.class,
+ () -> fs.create(new Path("a/b/c"), true));
+ }
+ }
+ }
+
+ /**
+ * Test the creation of a file with overwrite set to false without conditional overwrite.
+ * This test sets the configuration `fs.azure.enable.conditional.create.overwrite` to false,
+ * creates a directory, and then attempts to create a file at the same path with overwrite set to false.
+ * It expects an IOException to be thrown.
+ *
+ * @throws Exception if any exception occurs during the test execution
+ */
+ @Test
+ public void testCreationOverwriteFalseWithoutConditionalOverwrite() throws Exception {
+ try (AzureBlobFileSystem currentFs = getFileSystem()) {
+ Configuration config = new Configuration(this.getRawConfiguration());
+ config.set("fs.azure.enable.conditional.create.overwrite",
+ String.valueOf(false));
+
+ try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(
+ currentFs.getUri(), config)) {
+ fs.mkdirs(new Path("a/b/c"));
+ intercept(IOException.class,
+ () -> fs.create(new Path("a/b/c"), false));
+ }
+ }
}
/**
@@ -787,12 +1040,14 @@ public void testCreateSameFile() throws Exception {
*/
@Test
public void testCreateSameFileWithOverwriteFalse() throws Exception {
- final AzureBlobFileSystem fs = getFileSystem();
- fs.create(new Path("a/b/c"));
- Assertions.assertThat(fs.exists(new Path("a/b/c")))
- .describedAs("Path does not exist")
- .isTrue();
- intercept(IOException.class, () -> fs.create(new Path("a/b/c"), false));
+ try (AzureBlobFileSystem fs = getFileSystem()) {
+ fs.create(new Path("a/b/c"));
+ Assertions.assertThat(fs.exists(new Path("a/b/c")))
+ .describedAs("Path does not exist")
+ .isTrue();
+ intercept(IOException.class,
+ () -> fs.create(new Path("a/b/c"), false));
+ }
}
/**
@@ -801,12 +1056,104 @@ public void testCreateSameFileWithOverwriteFalse() throws Exception {
*/
@Test
public void testCreateSubPath() throws Exception {
- final AzureBlobFileSystem fs = getFileSystem();
- fs.create(new Path("a/b/c"));
- Assertions.assertThat(fs.exists(new Path("a/b/c")))
- .describedAs("Path does not exist")
- .isTrue();
- intercept(IOException.class, () -> fs.create(new Path("a/b")));
+ try (AzureBlobFileSystem fs = getFileSystem()) {
+ fs.create(new Path("a/b/c"));
+ Assertions.assertThat(fs.exists(new Path("a/b/c")))
+ .describedAs("Path does not exist")
+ .isTrue();
+ intercept(IOException.class,
+ () -> fs.create(new Path("a/b")));
+ }
+ }
+
+ /**
+ * Test create path in parallel with overwrite false.
+ **/
+ @Test
+ public void testParallelCreateOverwriteFalse()
+ throws Exception {
+ Configuration configuration = getRawConfiguration();
+ configuration.set(FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE, "false");
+ try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(
+ configuration)) {
+ ExecutorService executorService = Executors.newFixedThreadPool(5);
+ List> futures = new ArrayList<>();
+
+ final byte[] b = new byte[8 * ONE_MB];
+ new Random().nextBytes(b);
+ final Path filePath = path("/testPath");
+
+ futures.add(executorService.submit(() -> {
+ try {
+ fs.create(filePath, false);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }));
+
+ futures.add(executorService.submit(() -> {
+ try {
+ fs.create(filePath, false);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }));
+
+ futures.add(executorService.submit(() -> {
+ try {
+ fs.create(filePath, false);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }));
+
+ checkFuturesForExceptions(futures, 2);
+ }
+ }
+
+ /**
+ * Test create path in parallel with overwrite true.
+ **/
+ @Test
+ public void testParallelCreateOverwriteTrue()
+ throws Exception {
+ Configuration configuration = getRawConfiguration();
+ configuration.set(FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE, "false");
+ try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(
+ configuration)) {
+ ExecutorService executorService = Executors.newFixedThreadPool(5);
+ List> futures = new ArrayList<>();
+
+ final byte[] b = new byte[8 * ONE_MB];
+ new Random().nextBytes(b);
+ final Path filePath = path("/testPath");
+
+ futures.add(executorService.submit(() -> {
+ try {
+ fs.create(filePath);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }));
+
+ futures.add(executorService.submit(() -> {
+ try {
+ fs.create(filePath);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }));
+
+ futures.add(executorService.submit(() -> {
+ try {
+ fs.create(filePath);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }));
+
+ checkFuturesForExceptions(futures, 0);
+ }
}
/**
@@ -814,22 +1161,49 @@ public void testCreateSubPath() throws Exception {
*/
@Test
public void testCreatePathParentExplicit() throws Exception {
- final AzureBlobFileSystem fs = getFileSystem();
- fs.mkdirs(new Path("a/b/c"));
- Assertions.assertThat(fs.exists(new Path("a/b/c")))
- .describedAs("Path does not exist")
- .isTrue();
- fs.create(new Path("a/b/c/d"));
- Assertions.assertThat(fs.exists(new Path("a/b/c/d")))
- .describedAs("Path does not exist")
- .isTrue();
+ try (AzureBlobFileSystem fs = getFileSystem()) {
+ fs.mkdirs(new Path("a/b/c"));
+ Assertions.assertThat(fs.exists(new Path("a/b/c")))
+ .describedAs("Path does not exist")
+ .isTrue();
+ fs.create(new Path("a/b/c/d"));
+ Assertions.assertThat(fs.exists(new Path("a/b/c/d")))
+ .describedAs("Path does not exist")
+ .isTrue();
- // asserting that parent stays explicit
- Assertions.assertThat(
- DirectoryStateHelper.isExplicitDirectory(new Path("a/b/c"),
- fs, getTestTracingContext(fs, true)))
- .describedAs("Path is not an explicit directory")
- .isTrue();
+ // asserting that parent stays explicit
+ Assertions.assertThat(
+ DirectoryStateHelper.isExplicitDirectory(new Path("a/b/c"),
+ fs, getTestTracingContext(fs, true)))
+ .describedAs("Path is not an explicit directory")
+ .isTrue();
+ }
+ }
+
+ // Creation with append blob should succeed for blob endpoint
+ @Test
+ public void testCreateWithAppendBlobEnabled()
+ throws IOException, NoSuchFieldException, IllegalAccessException {
+ Configuration conf = getRawConfiguration();
+ try (AzureBlobFileSystem fs = Mockito.spy(
+ (AzureBlobFileSystem) FileSystem.newInstance(conf))) {
+ AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
+ doReturn(true).when(store).isAppendBlobKey(anyString());
+
+ // Set abfsStore as our mocked value.
+ Field privateField = AzureBlobFileSystem.class.getDeclaredField(
+ "abfsStore");
+ privateField.setAccessible(true);
+ privateField.set(fs, store);
+ Path testPath = path("/testPath");
+ AzureBlobFileSystemStore.Permissions permissions
+ = new AzureBlobFileSystemStore.Permissions(false,
+ FsPermission.getDefault(), FsPermission.getUMask(fs.getConf()));
+ fs.getAbfsStore().getClientHandler().getBlobClient().
+ createPath(makeQualified(testPath).toUri().getPath(), true, false,
+ permissions, true, null,
+ null, getTestTracingContext(fs, true));
+ }
}
/**
@@ -838,24 +1212,29 @@ fs, getTestTracingContext(fs, true)))
*/
@Test
public void testParentExplicitPathImplicit() throws Exception {
- final AzureBlobFileSystem fs = getFileSystem();
- Assume.assumeTrue(getIngressServiceType() == AbfsServiceType.BLOB);
- fs.mkdirs(new Path("/explicitParent"));
- String sourcePathName = "/explicitParent/implicitDir";
- Path sourcePath = new Path(sourcePathName);
- createAzCopyFolder(sourcePath);
-
- intercept(IOException.class, () ->
- fs.create(sourcePath, true));
- intercept(IOException.class, () ->
- fs.create(sourcePath, false));
-
- Assertions.assertThat(DirectoryStateHelper.isExplicitDirectory(sourcePath.getParent(), fs, getTestTracingContext(fs, true)))
- .describedAs("Parent directory should be explicit.")
- .isTrue();
- Assertions.assertThat(DirectoryStateHelper.isImplicitDirectory(sourcePath, fs, getTestTracingContext(fs, true)))
- .describedAs("Path should be implicit.")
- .isTrue();
+ assumeBlobServiceType();
+ try (AzureBlobFileSystem fs = getFileSystem()) {
+ fs.mkdirs(new Path("/explicitParent"));
+ String sourcePathName = "/explicitParent/implicitDir";
+ Path sourcePath = new Path(sourcePathName);
+ createAzCopyFolder(sourcePath);
+
+ intercept(IOException.class, () ->
+ fs.create(sourcePath, true));
+ intercept(IOException.class, () ->
+ fs.create(sourcePath, false));
+
+ Assertions.assertThat(
+ DirectoryStateHelper.isExplicitDirectory(sourcePath.getParent(), fs,
+ getTestTracingContext(fs, true)))
+ .describedAs("Parent directory should be explicit.")
+ .isTrue();
+ Assertions.assertThat(
+ DirectoryStateHelper.isImplicitDirectory(sourcePath, fs,
+ getTestTracingContext(fs, true)))
+ .describedAs("Path should be implicit.")
+ .isTrue();
+ }
}
/**
@@ -864,67 +1243,118 @@ public void testParentExplicitPathImplicit() throws Exception {
*/
@Test
public void testParentImplicitPathImplicit() throws Exception {
- final AzureBlobFileSystem fs = getFileSystem();
- Assume.assumeTrue(getIngressServiceType() == AbfsServiceType.BLOB);
- String parentPathName = "/implicitParent";
- Path parentPath = new Path(parentPathName);
- String sourcePathName = "/implicitParent/implicitDir";
- Path sourcePath = new Path(sourcePathName);
-
- createAzCopyFolder(parentPath);
- createAzCopyFolder(sourcePath);
-
- intercept(IOException.class, () ->
- fs.create(sourcePath, true));
- intercept(IOException.class, () ->
- fs.create(sourcePath, false));
-
- Assertions.assertThat(DirectoryStateHelper.isImplicitDirectory(parentPath, fs, getTestTracingContext(fs, true)))
- .describedAs("Parent directory is implicit.")
- .isTrue();
- Assertions.assertThat(DirectoryStateHelper.isImplicitDirectory(sourcePath, fs, getTestTracingContext(fs, true)))
- .describedAs("Path should also be implicit.")
- .isTrue();
+ assumeBlobServiceType();
+ try (AzureBlobFileSystem fs = getFileSystem()) {
+ String parentPathName = "/implicitParent";
+ Path parentPath = new Path(parentPathName);
+ String sourcePathName = "/implicitParent/implicitDir";
+ Path sourcePath = new Path(sourcePathName);
+
+ createAzCopyFolder(parentPath);
+ createAzCopyFolder(sourcePath);
+
+ intercept(IOException.class, () ->
+ fs.create(sourcePath, true));
+ intercept(IOException.class, () ->
+ fs.create(sourcePath, false));
+
+ Assertions.assertThat(
+ DirectoryStateHelper.isImplicitDirectory(parentPath, fs,
+ getTestTracingContext(fs, true)))
+ .describedAs("Parent directory is implicit.")
+ .isTrue();
+ Assertions.assertThat(
+ DirectoryStateHelper.isImplicitDirectory(sourcePath, fs,
+ getTestTracingContext(fs, true)))
+ .describedAs("Path should also be implicit.")
+ .isTrue();
+ }
}
/**
- * Tests create file when file exists already
+ * Tests create file when file exists already and parent is implicit
* Verifies using eTag for overwrite = true/false
*/
@Test
public void testCreateFileExistsImplicitParent() throws Exception {
- final AzureBlobFileSystem fs = getFileSystem();
- final AzureBlobFileSystemStore store = fs.getAbfsStore();
- String parentPathName = "/implicitParent";
- Path parentPath = new Path(parentPathName);
- createAzCopyFolder(parentPath);
+ try (AzureBlobFileSystem fs = getFileSystem()) {
+ String parentPathName = "/implicitParent";
+ Path parentPath = new Path(parentPathName);
+ createAzCopyFolder(parentPath);
- String fileName = "/implicitParent/testFile";
- Path filePath = new Path(fileName);
- fs.create(filePath);
- String eTag = extractFileEtag(fileName);
+ String fileName = "/implicitParent/testFile";
+ Path filePath = new Path(fileName);
+ fs.create(filePath);
+ String eTag = extractFileEtag(fileName);
- // testing createFile on already existing file path
- fs.create(filePath, true);
+ // testing createFile on already existing file path
+ fs.create(filePath, true);
- String eTagAfterCreateOverwrite = extractFileEtag(fileName);
+ String eTagAfterCreateOverwrite = extractFileEtag(fileName);
- Assertions.assertThat(eTag.equals(eTagAfterCreateOverwrite))
- .describedAs("New file eTag after create overwrite should be different from old")
- .isFalse();
+ Assertions.assertThat(eTag.equals(eTagAfterCreateOverwrite))
+ .describedAs(
+ "New file eTag after create overwrite should be different from old")
+ .isFalse();
- intercept(IOException.class, () ->
- fs.create(filePath, false));
+ intercept(IOException.class, () ->
+ fs.create(filePath, false));
- String eTagAfterCreate = extractFileEtag(fileName);
+ String eTagAfterCreate = extractFileEtag(fileName);
- Assertions.assertThat(eTagAfterCreateOverwrite.equals(eTagAfterCreate))
- .describedAs("File eTag should not change as creation fails")
- .isTrue();
+ Assertions.assertThat(eTagAfterCreateOverwrite.equals(eTagAfterCreate))
+ .describedAs("File eTag should not change as creation fails")
+ .isTrue();
- Assertions.assertThat(DirectoryStateHelper.isExplicitDirectory(parentPath, fs, getTestTracingContext(fs, true)))
- .describedAs("Parent path should also change to explicit.")
- .isTrue();
+ Assertions.assertThat(
+ DirectoryStateHelper.isExplicitDirectory(parentPath, fs,
+ getTestTracingContext(fs, true)))
+ .describedAs("Parent path should also change to explicit.")
+ .isTrue();
+ }
+ }
+
+ /**
+ * Tests create file when file exists already and parent is explicit
+ * Verifies using eTag for overwrite = true/false
+ */
+ @Test
+ public void testCreateFileExistsExplicitParent() throws Exception {
+ try (AzureBlobFileSystem fs = getFileSystem()) {
+ String parentPathName = "/explicitParent";
+ Path parentPath = new Path(parentPathName);
+ fs.mkdirs(parentPath);
+
+ String fileName = "/explicitParent/testFile";
+ Path filePath = new Path(fileName);
+ fs.create(filePath);
+ String eTag = extractFileEtag(fileName);
+
+ // testing createFile on already existing file path
+ fs.create(filePath, true);
+
+ String eTagAfterCreateOverwrite = extractFileEtag(fileName);
+
+ Assertions.assertThat(eTag.equals(eTagAfterCreateOverwrite))
+ .describedAs(
+ "New file eTag after create overwrite should be different from old")
+ .isFalse();
+
+ intercept(IOException.class, () ->
+ fs.create(filePath, false));
+
+ String eTagAfterCreate = extractFileEtag(fileName);
+
+ Assertions.assertThat(eTagAfterCreateOverwrite.equals(eTagAfterCreate))
+ .describedAs("File eTag should not change as creation fails")
+ .isTrue();
+
+ Assertions.assertThat(
+ DirectoryStateHelper.isExplicitDirectory(parentPath, fs,
+ getTestTracingContext(fs, true)))
+ .describedAs("Parent path should also change to explicit.")
+ .isTrue();
+ }
}
/**
@@ -934,24 +1364,24 @@ public void testCreateFileExistsImplicitParent() throws Exception {
*/
@Test
public void testCreateFileParentFile() throws Exception {
- final AzureBlobFileSystem fs = getFileSystem();
- final AzureBlobFileSystemStore store = fs.getAbfsStore();
-
- String parentName = "/testParentFile";
- Path parent = new Path(parentName);
- fs.create(parent);
-
- String childName = "/testParentFile/testChildFile";
- Path child = new Path(childName);
- IOException e = intercept(IOException.class, () ->
- fs.create(child, false));
-
- // asserting that parent stays explicit
- FileStatus status = fs.getAbfsStore().getFileStatus(fs.makeQualified(new Path(parentName)),
- new TracingContext(getTestTracingContext(fs, true)));
- Assertions.assertThat(status.isDirectory())
- .describedAs("Path is not a file")
- .isFalse();
+ try (AzureBlobFileSystem fs = getFileSystem()) {
+ String parentName = "/testParentFile";
+ Path parent = new Path(parentName);
+ fs.create(parent);
+
+ String childName = "/testParentFile/testChildFile";
+ Path child = new Path(childName);
+ IOException e = intercept(IOException.class, () ->
+ fs.create(child, false));
+
+ // asserting that parent stays explicit
+ FileStatus status = fs.getAbfsStore()
+ .getFileStatus(fs.makeQualified(new Path(parentName)),
+ new TracingContext(getTestTracingContext(fs, true)));
+ Assertions.assertThat(status.isDirectory())
+ .describedAs("Path is not a file")
+ .isFalse();
+ }
}
/**
@@ -960,9 +1390,11 @@ public void testCreateFileParentFile() throws Exception {
*/
@Test
public void testCreateMkdirs() throws Exception {
- final AzureBlobFileSystem fs = getFileSystem();
- fs.create(new Path("a/b/c"));
- intercept(IOException.class, () -> fs.mkdirs(new Path("a/b/c/d")));
+ try (AzureBlobFileSystem fs = getFileSystem()) {
+ fs.create(new Path("a/b/c"));
+ intercept(IOException.class,
+ () -> fs.mkdirs(new Path("a/b/c/d")));
+ }
}
/**
@@ -971,37 +1403,41 @@ public void testCreateMkdirs() throws Exception {
*/
@Test
public void testMkdirs() throws Exception {
- final AzureBlobFileSystem fs = getFileSystem();
- fs.mkdirs(new Path("a/b"));
- fs.mkdirs(new Path("a/b/c/d"));
- fs.mkdirs(new Path("a/b/c/e"));
+ try (AzureBlobFileSystem fs = getFileSystem()) {
+ fs.mkdirs(new Path("a/b"));
+ fs.mkdirs(new Path("a/b/c/d"));
+ fs.mkdirs(new Path("a/b/c/e"));
- Assertions.assertThat(fs.exists(new Path("a/b")))
- .describedAs("Path a/b does not exist")
- .isTrue();
- Assertions.assertThat(fs.exists(new Path("a/b/c/d")))
- .describedAs("Path a/b/c/d does not exist")
- .isTrue();
- Assertions.assertThat(fs.exists(new Path("a/b/c/e")))
- .describedAs("Path a/b/c/e does not exist")
- .isTrue();
+ Assertions.assertThat(fs.exists(new Path("a/b")))
+ .describedAs("Path a/b does not exist")
+ .isTrue();
+ Assertions.assertThat(fs.exists(new Path("a/b/c/d")))
+ .describedAs("Path a/b/c/d does not exist")
+ .isTrue();
+ Assertions.assertThat(fs.exists(new Path("a/b/c/e")))
+ .describedAs("Path a/b/c/e does not exist")
+ .isTrue();
- // Asserting that directories created as explicit
- FileStatus status = fs.getAbfsStore().getFileStatus(fs.makeQualified(new Path("a/b")),
- new TracingContext(getTestTracingContext(fs, true)));
- Assertions.assertThat(status.isDirectory())
- .describedAs("Path a/b is not an explicit directory")
- .isTrue();
- FileStatus status1 = fs.getAbfsStore().getFileStatus(fs.makeQualified(new Path("a/b/c/d")),
- new TracingContext(getTestTracingContext(fs, true)));
- Assertions.assertThat(status1.isDirectory())
- .describedAs("Path a/b/c/d is not an explicit directory")
- .isTrue();
- FileStatus status2 = fs.getAbfsStore().getFileStatus(fs.makeQualified(new Path("a/b/c/e")),
- new TracingContext(getTestTracingContext(fs, true)));
- Assertions.assertThat(status2.isDirectory())
- .describedAs("Path a/b/c/e is not an explicit directory")
- .isTrue();
+ // Asserting that directories created as explicit
+ FileStatus status = fs.getAbfsStore()
+ .getFileStatus(fs.makeQualified(new Path("a/b")),
+ new TracingContext(getTestTracingContext(fs, true)));
+ Assertions.assertThat(status.isDirectory())
+ .describedAs("Path a/b is not an explicit directory")
+ .isTrue();
+ FileStatus status1 = fs.getAbfsStore()
+ .getFileStatus(fs.makeQualified(new Path("a/b/c/d")),
+ new TracingContext(getTestTracingContext(fs, true)));
+ Assertions.assertThat(status1.isDirectory())
+ .describedAs("Path a/b/c/d is not an explicit directory")
+ .isTrue();
+ FileStatus status2 = fs.getAbfsStore()
+ .getFileStatus(fs.makeQualified(new Path("a/b/c/e")),
+ new TracingContext(getTestTracingContext(fs, true)));
+ Assertions.assertThat(status2.isDirectory())
+ .describedAs("Path a/b/c/e is not an explicit directory")
+ .isTrue();
+ }
}
/**
@@ -1010,19 +1446,21 @@ public void testMkdirs() throws Exception {
*/
@Test
public void testMkdirsCreateSubPath() throws Exception {
- final AzureBlobFileSystem fs = getFileSystem();
- fs.mkdirs(new Path("a/b/c"));
- Assertions.assertThat(fs.exists(new Path("a/b/c")))
- .describedAs("Path a/b/c does not exist")
- .isTrue();
- intercept(IOException.class, () -> fs.create(new Path("a/b")));
-
- // Asserting that directories created as explicit
- FileStatus status2 = fs.getAbfsStore().getFileStatus(fs.makeQualified(new Path("a/b/c")),
- new TracingContext(getTestTracingContext(fs, true)));
- Assertions.assertThat(status2.isDirectory())
- .describedAs("Path a/b/c is not an explicit directory")
- .isTrue();
+ try (AzureBlobFileSystem fs = getFileSystem()) {
+ fs.mkdirs(new Path("a/b/c"));
+ Assertions.assertThat(fs.exists(new Path("a/b/c")))
+ .describedAs("Path a/b/c does not exist")
+ .isTrue();
+ intercept(IOException.class, () -> fs.create(new Path("a/b")));
+
+ // Asserting that directories created as explicit
+ FileStatus status2 = fs.getAbfsStore()
+ .getFileStatus(fs.makeQualified(new Path("a/b/c")),
+ new TracingContext(getTestTracingContext(fs, true)));
+ Assertions.assertThat(status2.isDirectory())
+ .describedAs("Path a/b/c is not an explicit directory")
+ .isTrue();
+ }
}
/**
@@ -1031,37 +1469,41 @@ public void testMkdirsCreateSubPath() throws Exception {
*/
@Test
public void testMkdirsByLevel() throws Exception {
- final AzureBlobFileSystem fs = getFileSystem();
- fs.mkdirs(new Path("a"));
- fs.mkdirs(new Path("a/b/c"));
- fs.mkdirs(new Path("a/b/c/d/e"));
+ try (AzureBlobFileSystem fs = getFileSystem()) {
+ fs.mkdirs(new Path("a"));
+ fs.mkdirs(new Path("a/b/c"));
+ fs.mkdirs(new Path("a/b/c/d/e"));
- Assertions.assertThat(fs.exists(new Path("a")))
- .describedAs("Path a does not exist")
- .isTrue();
- Assertions.assertThat(fs.exists(new Path("a/b/c")))
- .describedAs("Path a/b/c does not exist")
- .isTrue();
- Assertions.assertThat(fs.exists(new Path("a/b/c/d/e")))
- .describedAs("Path a/b/c/d/e does not exist")
- .isTrue();
+ Assertions.assertThat(fs.exists(new Path("a")))
+ .describedAs("Path a does not exist")
+ .isTrue();
+ Assertions.assertThat(fs.exists(new Path("a/b/c")))
+ .describedAs("Path a/b/c does not exist")
+ .isTrue();
+ Assertions.assertThat(fs.exists(new Path("a/b/c/d/e")))
+ .describedAs("Path a/b/c/d/e does not exist")
+ .isTrue();
- // Asserting that directories created as explicit
- FileStatus status = fs.getAbfsStore().getFileStatus(fs.makeQualified(new Path("a/")),
- new TracingContext(getTestTracingContext(fs, true)));
- Assertions.assertThat(status.isDirectory())
- .describedAs("Path a is not an explicit directory")
- .isTrue();
- FileStatus status1 = fs.getAbfsStore().getFileStatus(fs.makeQualified(new Path("a/b/c")),
- new TracingContext(getTestTracingContext(fs, true)));
- Assertions.assertThat(status1.isDirectory())
- .describedAs("Path a/b/c is not an explicit directory")
- .isTrue();
- FileStatus status2 = fs.getAbfsStore().getFileStatus(fs.makeQualified(new Path("a/b/c/d/e")),
- new TracingContext(getTestTracingContext(fs, true)));
- Assertions.assertThat(status2.isDirectory())
- .describedAs("Path a/b/c/d/e is not an explicit directory")
- .isTrue();
+ // Asserting that directories created as explicit
+ FileStatus status = fs.getAbfsStore()
+ .getFileStatus(fs.makeQualified(new Path("a/")),
+ new TracingContext(getTestTracingContext(fs, true)));
+ Assertions.assertThat(status.isDirectory())
+ .describedAs("Path a is not an explicit directory")
+ .isTrue();
+ FileStatus status1 = fs.getAbfsStore()
+ .getFileStatus(fs.makeQualified(new Path("a/b/c")),
+ new TracingContext(getTestTracingContext(fs, true)));
+ Assertions.assertThat(status1.isDirectory())
+ .describedAs("Path a/b/c is not an explicit directory")
+ .isTrue();
+ FileStatus status2 = fs.getAbfsStore()
+ .getFileStatus(fs.makeQualified(new Path("a/b/c/d/e")),
+ new TracingContext(getTestTracingContext(fs, true)));
+ Assertions.assertThat(status2.isDirectory())
+ .describedAs("Path a/b/c/d/e is not an explicit directory")
+ .isTrue();
+ }
}
/*
@@ -1069,14 +1511,15 @@ public void testMkdirsByLevel() throws Exception {
*/
@Test
public void testMkdirsWithDelete() throws Exception {
- final AzureBlobFileSystem fs = getFileSystem();
- fs.mkdirs(new Path("a/b"));
- fs.mkdirs(new Path("a/b/c/d"));
- fs.delete(new Path("a/b/c/d"));
- fs.getFileStatus(new Path("a/b/c"));
- Assertions.assertThat(fs.exists(new Path("a/b/c")))
- .describedAs("Path a/b/c does not exist")
- .isTrue();
+ try (AzureBlobFileSystem fs = getFileSystem()) {
+ fs.mkdirs(new Path("a/b"));
+ fs.mkdirs(new Path("a/b/c/d"));
+ fs.delete(new Path("a/b/c/d"));
+ fs.getFileStatus(new Path("a/b/c"));
+ Assertions.assertThat(fs.exists(new Path("a/b/c")))
+ .describedAs("Path a/b/c does not exist")
+ .isTrue();
+ }
}
/**
@@ -1084,16 +1527,17 @@ public void testMkdirsWithDelete() throws Exception {
*/
@Test
public void testMkdirsWithRename() throws Exception {
- final AzureBlobFileSystem fs = getFileSystem();
- fs.mkdirs(new Path("a/b/c/d"));
- fs.create(new Path("e/file"));
- fs.delete(new Path("a/b/c/d"));
- Assertions.assertThat(fs.rename(new Path("e"), new Path("a/b/c/d")))
- .describedAs("Failed to rename path e to a/b/c/d")
- .isTrue();
- Assertions.assertThat(fs.exists(new Path("a/b/c/d/file")))
- .describedAs("Path a/b/c/d/file does not exist")
- .isTrue();
+ try (AzureBlobFileSystem fs = getFileSystem()) {
+ fs.mkdirs(new Path("a/b/c/d"));
+ fs.create(new Path("e/file"));
+ fs.delete(new Path("a/b/c/d"));
+ Assertions.assertThat(fs.rename(new Path("e"), new Path("a/b/c/d")))
+ .describedAs("Failed to rename path e to a/b/c/d")
+ .isTrue();
+ Assertions.assertThat(fs.exists(new Path("a/b/c/d/file")))
+ .describedAs("Path a/b/c/d/file does not exist")
+ .isTrue();
+ }
}
/**
@@ -1102,11 +1546,13 @@ public void testMkdirsWithRename() throws Exception {
*/
@Test
public void testFileCreateMkdirsRoot() throws Exception {
- final AzureBlobFileSystem fs = getFileSystem();
- fs.setWorkingDirectory(new Path("/"));
- final Path p1 = new Path("dir1");
- fs.create(p1);
- intercept(IOException.class, () -> fs.mkdirs(new Path("dir1/dir2")));
+ try (AzureBlobFileSystem fs = getFileSystem()) {
+ fs.setWorkingDirectory(new Path("/"));
+ final Path p1 = new Path("dir1");
+ fs.create(p1);
+ intercept(IOException.class,
+ () -> fs.mkdirs(new Path("dir1/dir2")));
+ }
}
/**
@@ -1115,10 +1561,11 @@ public void testFileCreateMkdirsRoot() throws Exception {
*/
@Test
public void testFileCreateMkdirsNonRoot() throws Exception {
- final AzureBlobFileSystem fs = getFileSystem();
- final Path p1 = new Path("dir1");
- fs.create(p1);
- intercept(IOException.class, () -> fs.mkdirs(new Path("dir1/dir2")));
+ try (AzureBlobFileSystem fs = getFileSystem()) {
+ final Path p1 = new Path("dir1");
+ fs.create(p1);
+ intercept(IOException.class, () -> fs.mkdirs(new Path("dir1/dir2")));
+ }
}
/**
@@ -1127,19 +1574,21 @@ public void testFileCreateMkdirsNonRoot() throws Exception {
*/
@Test
public void testCreateSameDirectory() throws Exception {
- final AzureBlobFileSystem fs = getFileSystem();
- fs.mkdirs(new Path("a/b/c"));
- fs.mkdirs(new Path("a/b/c"));
+ try (AzureBlobFileSystem fs = getFileSystem()) {
+ fs.mkdirs(new Path("a/b/c"));
+ fs.mkdirs(new Path("a/b/c"));
- Assertions.assertThat(fs.exists(new Path("a/b/c")))
- .describedAs("Path a/b/c does not exist")
- .isTrue();
- // Asserting that directories created as explicit
- FileStatus status = fs.getAbfsStore().getFileStatus(fs.makeQualified(new Path("a/b/c")),
- new TracingContext(getTestTracingContext(fs, true)));
- Assertions.assertThat(status.isDirectory())
- .describedAs("Path a/b/c is not an explicit directory")
- .isTrue();
+ Assertions.assertThat(fs.exists(new Path("a/b/c")))
+ .describedAs("Path a/b/c does not exist")
+ .isTrue();
+ // Asserting that directories created as explicit
+ FileStatus status = fs.getAbfsStore()
+ .getFileStatus(fs.makeQualified(new Path("a/b/c")),
+ new TracingContext(getTestTracingContext(fs, true)));
+ Assertions.assertThat(status.isDirectory())
+ .describedAs("Path a/b/c is not an explicit directory")
+ .isTrue();
+ }
}
/**
@@ -1148,9 +1597,10 @@ public void testCreateSameDirectory() throws Exception {
*/
@Test
public void testCreateSamePathDirectory() throws Exception {
- final AzureBlobFileSystem fs = getFileSystem();
- fs.create(new Path("a"));
- intercept(IOException.class, () -> fs.mkdirs(new Path("a")));
+ try (AzureBlobFileSystem fs = getFileSystem()) {
+ fs.create(new Path("a"));
+ intercept(IOException.class, () -> fs.mkdirs(new Path("a")));
+ }
}
/**
@@ -1158,17 +1608,19 @@ public void testCreateSamePathDirectory() throws Exception {
*/
@Test
public void testMkdirOnRootAsParent() throws Exception {
- final AzureBlobFileSystem fs = getFileSystem();
- final Path path = new Path("a");
- fs.setWorkingDirectory(new Path("/"));
- fs.mkdirs(path);
-
- // Asserting that the directory created by mkdir exists as explicit.
- FileStatus status = fs.getAbfsStore().getFileStatus(fs.makeQualified(new Path("a")),
- new TracingContext(getTestTracingContext(fs, true)));
- Assertions.assertThat(status.isDirectory())
- .describedAs("Path a is not an explicit directory")
- .isTrue();
+ try (AzureBlobFileSystem fs = getFileSystem()) {
+ final Path path = new Path("a");
+ fs.setWorkingDirectory(new Path("/"));
+ fs.mkdirs(path);
+
+ // Asserting that the directory created by mkdir exists as explicit.
+ FileStatus status = fs.getAbfsStore()
+ .getFileStatus(fs.makeQualified(new Path("a")),
+ new TracingContext(getTestTracingContext(fs, true)));
+ Assertions.assertThat(status.isDirectory())
+ .describedAs("Path a is not an explicit directory")
+ .isTrue();
+ }
}
/**
@@ -1176,16 +1628,33 @@ public void testMkdirOnRootAsParent() throws Exception {
*/
@Test
public void testMkdirOnRoot() throws Exception {
- final AzureBlobFileSystem fs = getFileSystem();
- final Path path = new Path("/");
- fs.setWorkingDirectory(new Path("/"));
- fs.mkdirs(path);
-
- FileStatus status = fs.getAbfsStore().getFileStatus(fs.makeQualified(new Path("/")),
- new TracingContext(getTestTracingContext(fs, true)));
- Assertions.assertThat(status.isDirectory())
- .describedAs("Path is not an explicit directory")
- .isTrue();
+ try (AzureBlobFileSystem fs = getFileSystem()) {
+ final Path path = new Path("/");
+ fs.setWorkingDirectory(new Path("/"));
+ fs.mkdirs(path);
+
+ FileStatus status = fs.getAbfsStore()
+ .getFileStatus(fs.makeQualified(new Path("/")),
+ new TracingContext(getTestTracingContext(fs, true)));
+ Assertions.assertThat(status.isDirectory())
+ .describedAs("Path is not an explicit directory")
+ .isTrue();
+ }
+ }
+
+ /**
+ * Creation of file on path with unicode chars
+ */
+ @Test
+ public void testCreateUnicode() throws Exception {
+ try (AzureBlobFileSystem fs = getFileSystem()) {
+ final Path path = new Path("/file\u0031");
+ fs.create(path);
+
+ Assertions.assertThat(fs.exists(path))
+ .describedAs("Path with unicode does not exist")
+ .isTrue();
+ }
}
/**
@@ -1193,16 +1662,18 @@ public void testMkdirOnRoot() throws Exception {
*/
@Test
public void testMkdirUnicode() throws Exception {
- final AzureBlobFileSystem fs = getFileSystem();
- final Path path = new Path("/dir\u0031");
- fs.mkdirs(path);
-
- // Asserting that the directory created by mkdir exists as explicit.
- FileStatus status = fs.getAbfsStore().getFileStatus(fs.makeQualified(path),
- new TracingContext(getTestTracingContext(fs, true)));
- Assertions.assertThat(status.isDirectory())
- .describedAs("Path is not an explicit directory")
- .isTrue();
+ try (AzureBlobFileSystem fs = getFileSystem()) {
+ final Path path = new Path("/dir\u0031");
+ fs.mkdirs(path);
+
+ // Asserting that the directory created by mkdir exists as explicit.
+ FileStatus status = fs.getAbfsStore()
+ .getFileStatus(fs.makeQualified(path),
+ new TracingContext(getTestTracingContext(fs, true)));
+ Assertions.assertThat(status.isDirectory())
+ .describedAs("Path is not an explicit directory")
+ .isTrue();
+ }
}
/**
@@ -1210,33 +1681,35 @@ public void testMkdirUnicode() throws Exception {
*/
@Test
public void testMkdirParallelRequests() throws Exception {
- final AzureBlobFileSystem fs = getFileSystem();
- final Path path = new Path("/dir1");
+ try (AzureBlobFileSystem fs = getFileSystem()) {
+ final Path path = new Path("/dir1");
- ExecutorService es = Executors.newFixedThreadPool(3);
+ ExecutorService es = Executors.newFixedThreadPool(3);
- List> tasks = new ArrayList<>();
+ List> tasks = new ArrayList<>();
- for (int i = 0; i < 3; i++) {
- CompletableFuture future = CompletableFuture.runAsync(() -> {
- try {
- fs.mkdirs(path);
- } catch (IOException e) {
- throw new CompletionException(e);
- }
- }, es);
- tasks.add(future);
- }
+ for (int i = 0; i < 3; i++) {
+ CompletableFuture future = CompletableFuture.runAsync(() -> {
+ try {
+ fs.mkdirs(path);
+ } catch (IOException e) {
+ throw new CompletionException(e);
+ }
+ }, es);
+ tasks.add(future);
+ }
- // Wait for all the tasks to complete
- CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0])).join();
+ // Wait for all the tasks to complete
+ CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0])).join();
- // Assert that the directory created by mkdir exists as explicit
- FileStatus status = fs.getAbfsStore().getFileStatus(fs.makeQualified(path),
- new TracingContext(getTestTracingContext(fs, true)));
- Assertions.assertThat(status.isDirectory())
- .describedAs("Path is not an explicit directory")
- .isTrue();
+ // Assert that the directory created by mkdir exists as explicit
+ FileStatus status = fs.getAbfsStore()
+ .getFileStatus(fs.makeQualified(path),
+ new TracingContext(getTestTracingContext(fs, true)));
+ Assertions.assertThat(status.isDirectory())
+ .describedAs("Path is not an explicit directory")
+ .isTrue();
+ }
}
@@ -1248,16 +1721,18 @@ public void testMkdirParallelRequests() throws Exception {
public void testCreateSameDirectoryOverwriteFalse() throws Exception {
Configuration configuration = getRawConfiguration();
configuration.setBoolean(FS_AZURE_ENABLE_MKDIR_OVERWRITE, false);
- AzureBlobFileSystem fs1 = (AzureBlobFileSystem) FileSystem.newInstance(configuration);
- fs1.mkdirs(new Path("a/b/c"));
- fs1.mkdirs(new Path("a/b/c"));
-
- // Asserting that directories created as explicit
- FileStatus status = fs1.getAbfsStore().getFileStatus(fs1.makeQualified(new Path("a/b/c")),
- new TracingContext(getTestTracingContext(fs1, true)));
- Assertions.assertThat(status.isDirectory())
- .describedAs("Path is not an explicit directory")
- .isTrue();
+ try (AzureBlobFileSystem fs1 = (AzureBlobFileSystem) FileSystem.newInstance(configuration)) {
+ fs1.mkdirs(new Path("a/b/c"));
+ fs1.mkdirs(new Path("a/b/c"));
+
+ // Asserting that directories created as explicit
+ FileStatus status = fs1.getAbfsStore()
+ .getFileStatus(fs1.makeQualified(new Path("a/b/c")),
+ new TracingContext(getTestTracingContext(fs1, true)));
+ Assertions.assertThat(status.isDirectory())
+ .describedAs("Path is not an explicit directory")
+ .isTrue();
+ }
}
/**
@@ -1265,55 +1740,84 @@ public void testCreateSameDirectoryOverwriteFalse() throws Exception {
*/
@Test
public void testCreateDirectoryAndFileRecreation() throws Exception {
- final AzureBlobFileSystem fs = getFileSystem();
- fs.mkdirs(new Path("a/b/c"));
- fs.create(new Path("a/b/c/d"));
- Assertions.assertThat(fs.exists(new Path("a/b/c")))
- .describedAs("Directory a/b/c does not exist")
- .isTrue();
- Assertions.assertThat(fs.exists(new Path("a/b/c/d")))
- .describedAs("File a/b/c/d does not exist")
- .isTrue();
- intercept(IOException.class, () -> fs.mkdirs(new Path("a/b/c/d")));
+ try (AzureBlobFileSystem fs = getFileSystem()) {
+ fs.mkdirs(new Path("a/b/c"));
+ fs.create(new Path("a/b/c/d"));
+ Assertions.assertThat(fs.exists(new Path("a/b/c")))
+ .describedAs("Directory a/b/c does not exist")
+ .isTrue();
+ Assertions.assertThat(fs.exists(new Path("a/b/c/d")))
+ .describedAs("File a/b/c/d does not exist")
+ .isTrue();
+ intercept(IOException.class,
+ () -> fs.mkdirs(new Path("a/b/c/d")));
+ }
}
@Test
public void testCreateNonRecursiveForAtomicDirectoryFile() throws Exception {
- AzureBlobFileSystem fileSystem = getFileSystem();
- fileSystem.setWorkingDirectory(new Path("/"));
- fileSystem.mkdirs(new Path("/hbase/dir"));
- fileSystem.createFile(new Path("/hbase/dir/file"))
- .overwrite(false)
- .replication((short) 1)
- .bufferSize(1024)
- .blockSize(1024)
- .build();
- Assertions.assertThat(fileSystem.exists(new Path("/hbase/dir/file")))
- .describedAs("File /hbase/dir/file does not exist")
- .isTrue();
+ try (AzureBlobFileSystem fileSystem = getFileSystem()) {
+ fileSystem.setWorkingDirectory(new Path("/"));
+ fileSystem.mkdirs(new Path("/hbase/dir"));
+ fileSystem.createFile(new Path("/hbase/dir/file"))
+ .overwrite(false)
+ .replication((short) 1)
+ .bufferSize(1024)
+ .blockSize(1024)
+ .build();
+ Assertions.assertThat(fileSystem.exists(new Path("/hbase/dir/file")))
+ .describedAs("File /hbase/dir/file does not exist")
+ .isTrue();
+ }
+ }
+
+ /**
+ * Test creating a file on a non-existing path with an implicit parent directory.
+ * This test creates an implicit directory, then creates a file
+ * inside this implicit directory and asserts that it gets created.
+ *
+ * @throws Exception if any exception occurs during the test execution
+ */
+ @Test
+ public void testCreateOnNonExistingPathWithImplicitParentDir() throws Exception {
+ try (AzureBlobFileSystem fs = getFileSystem()) {
+ final Path implicitPath = new Path("dir1");
+ final Path path = new Path("dir1/dir2");
+ createAzCopyFolder(implicitPath);
+
+ // Creating a directory on non-existing path inside an implicit directory
+ fs.create(path);
+
+ // Asserting that path created by azcopy becomes explicit.
+ Assertions.assertThat(fs.exists(path))
+ .describedAs("File dir1/dir2 does not exist")
+ .isTrue();
+ }
}
@Test
public void testMkdirOnNonExistingPathWithImplicitParentDir() throws Exception {
- final AzureBlobFileSystem fs = getFileSystem();
- final Path implicitPath = new Path("dir1");
- final Path path = new Path("dir1/dir2");
- createAzCopyFolder(implicitPath);
+ try (AzureBlobFileSystem fs = getFileSystem()) {
+ final Path implicitPath = new Path("dir1");
+ final Path path = new Path("dir1/dir2");
+ createAzCopyFolder(implicitPath);
- // Creating a directory on non-existing path inside an implicit directory
- fs.mkdirs(path);
+ // Creating a directory on non-existing path inside an implicit directory
+ fs.mkdirs(path);
- // Asserting that path created by azcopy becomes explicit.
- Assertions.assertThat(DirectoryStateHelper.isExplicitDirectory(implicitPath,
- fs, getTestTracingContext(fs, true)))
- .describedAs("Path created by azcopy did not become explicit")
- .isTrue();
+ // Asserting that path created by azcopy becomes explicit.
+ Assertions.assertThat(
+ DirectoryStateHelper.isExplicitDirectory(implicitPath,
+ fs, getTestTracingContext(fs, true)))
+ .describedAs("Path created by azcopy did not become explicit")
+ .isTrue();
- // Asserting that the directory created by mkdir exists as explicit.
- Assertions.assertThat(DirectoryStateHelper.isExplicitDirectory(path,
- fs, getTestTracingContext(fs, true)))
- .describedAs("Directory created by mkdir does not exist as explicit")
- .isTrue();
+ // Asserting that the directory created by mkdir exists as explicit.
+ Assertions.assertThat(DirectoryStateHelper.isExplicitDirectory(path,
+ fs, getTestTracingContext(fs, true)))
+ .describedAs("Directory created by mkdir does not exist as explicit")
+ .isTrue();
+ }
}
/**
@@ -1323,30 +1827,32 @@ fs, getTestTracingContext(fs, true)))
*/
@Test
public void testMkdirOnExistingExplicitDirWithImplicitParentDir() throws Exception {
- final AzureBlobFileSystem fs = getFileSystem();
- final Path implicitPath = new Path("dir1");
- final Path path = new Path("dir1/dir2");
+ try (AzureBlobFileSystem fs = getFileSystem()) {
+ final Path implicitPath = new Path("dir1");
+ final Path path = new Path("dir1/dir2");
- // Creating implicit directory to be used as parent
- createAzCopyFolder(implicitPath);
+ // Creating implicit directory to be used as parent
+ createAzCopyFolder(implicitPath);
- // Creating an explicit directory on the path first
- fs.mkdirs(path);
+ // Creating an explicit directory on the path first
+ fs.mkdirs(path);
- // Creating a directory on existing explicit directory inside an implicit directory
- fs.mkdirs(path);
+ // Creating a directory on existing explicit directory inside an implicit directory
+ fs.mkdirs(path);
- // Asserting that path created by azcopy becomes explicit.
- Assertions.assertThat(DirectoryStateHelper.isExplicitDirectory(implicitPath,
- fs, getTestTracingContext(fs, true)))
- .describedAs("Path created by azcopy did not become explicit")
- .isTrue();
+ // Asserting that path created by azcopy becomes explicit.
+ Assertions.assertThat(
+ DirectoryStateHelper.isExplicitDirectory(implicitPath,
+ fs, getTestTracingContext(fs, true)))
+ .describedAs("Path created by azcopy did not become explicit")
+ .isTrue();
- // Asserting that the directory created by mkdir exists as explicit.
- Assertions.assertThat(DirectoryStateHelper.isExplicitDirectory(path,
- fs, getTestTracingContext(fs, true)))
- .describedAs("Directory created by mkdir does not exist as explicit")
- .isTrue();
+ // Asserting that the directory created by mkdir exists as explicit.
+ Assertions.assertThat(DirectoryStateHelper.isExplicitDirectory(path,
+ fs, getTestTracingContext(fs, true)))
+ .describedAs("Directory created by mkdir does not exist as explicit")
+ .isTrue();
+ }
}
/**
@@ -1356,29 +1862,31 @@ fs, getTestTracingContext(fs, true)))
*/
@Test
public void testMkdirOnExistingImplicitDirWithExplicitParentDir() throws Exception {
- final AzureBlobFileSystem fs = getFileSystem();
- final Path explicitPath = new Path("dir1");
- final Path path = new Path("dir1/dir2");
+ try (AzureBlobFileSystem fs = getFileSystem()) {
+ final Path explicitPath = new Path("dir1");
+ final Path path = new Path("dir1/dir2");
- // Creating an explicit directory to be used a parent
- fs.mkdirs(explicitPath);
+ // Creating an explicit directory to be used a parent
+ fs.mkdirs(explicitPath);
- createAzCopyFolder(path);
+ createAzCopyFolder(path);
- // Creating a directory on existing implicit directory inside an explicit directory
- fs.mkdirs(path);
+ // Creating a directory on existing implicit directory inside an explicit directory
+ fs.mkdirs(path);
- // Asserting that the directory created by mkdir exists as explicit.
- Assertions.assertThat(DirectoryStateHelper.isExplicitDirectory(explicitPath,
- fs, getTestTracingContext(fs, true)))
- .describedAs("Explicit parent directory does not exist as explicit")
- .isTrue();
+ // Asserting that the directory created by mkdir exists as explicit.
+ Assertions.assertThat(
+ DirectoryStateHelper.isExplicitDirectory(explicitPath,
+ fs, getTestTracingContext(fs, true)))
+ .describedAs("Explicit parent directory does not exist as explicit")
+ .isTrue();
- // Asserting that the directory created by mkdir exists as explicit.
- Assertions.assertThat(DirectoryStateHelper.isExplicitDirectory(path,
- fs, getTestTracingContext(fs, true)))
- .describedAs("Directory created by mkdir does not exist as explicit")
- .isTrue();
+ // Asserting that the directory created by mkdir exists as explicit.
+ Assertions.assertThat(DirectoryStateHelper.isImplicitDirectory(path,
+ fs, getTestTracingContext(fs, true)))
+ .describedAs("Mkdir created explicit directory")
+ .isTrue();
+ }
}
/**
@@ -1388,29 +1896,30 @@ fs, getTestTracingContext(fs, true)))
*/
@Test
public void testMkdirOnExistingImplicitDirWithImplicitParentDir() throws Exception {
- final AzureBlobFileSystem fs = getFileSystem();
- final Path implicitPath = new Path("dir3");
- final Path path = new Path("dir3/dir4");
+ try (AzureBlobFileSystem fs = getFileSystem()) {
+ final Path implicitPath = new Path("dir3");
+ final Path path = new Path("dir3/dir4");
- createAzCopyFolder(implicitPath);
+ createAzCopyFolder(implicitPath);
- // Creating an implicit directory on path
- createAzCopyFolder(path);
+ // Creating an implicit directory on path
+ createAzCopyFolder(path);
- // Creating a directory on existing implicit directory inside an implicit directory
- fs.mkdirs(path);
+ // Creating a directory on existing implicit directory inside an implicit directory
+ fs.mkdirs(path);
- // Asserting that path created by azcopy becomes explicit.
- Assertions.assertThat(DirectoryStateHelper.isExplicitDirectory(implicitPath,
- fs, getTestTracingContext(fs, true)))
- .describedAs("Path created by azcopy did not become explicit")
- .isTrue();
+ Assertions.assertThat(
+ DirectoryStateHelper.isImplicitDirectory(implicitPath,
+ fs, getTestTracingContext(fs, true)))
+ .describedAs("Marker is present for path created by azcopy")
+ .isTrue();
- // Asserting that the directory created by mkdir exists as explicit.
- Assertions.assertThat(DirectoryStateHelper.isExplicitDirectory(path,
- fs, getTestTracingContext(fs, true)))
- .describedAs("Directory created by mkdir does not exist as explicit")
- .isTrue();
+ // Asserting that the mkdir didn't create markers for existing directory.
+ Assertions.assertThat(DirectoryStateHelper.isImplicitDirectory(path,
+ fs, getTestTracingContext(fs, true)))
+ .describedAs("Marker is present for existing directory")
+ .isTrue();
+ }
}
/**
@@ -1420,32 +1929,27 @@ fs, getTestTracingContext(fs, true)))
*/
@Test
public void testMkdirOnExistingFileWithImplicitParentDir() throws Exception {
- final AzureBlobFileSystem fs = getFileSystem();
- final Path implicitPath = new Path("dir1");
- final Path path = new Path("dir1/dir2");
+ try (AzureBlobFileSystem fs = getFileSystem()) {
+ final Path implicitPath = new Path("dir1");
+ final Path path = new Path("dir1/dir2");
- createAzCopyFolder(implicitPath);
+ createAzCopyFolder(implicitPath);
- // Creating a file on path
- fs.create(path);
+ // Creating a file on path
+ fs.create(path);
- // Creating a directory on existing file inside an implicit directory
- // Asserting that the mkdir fails
- LambdaTestUtils.intercept(FileAlreadyExistsException.class, () -> {
- fs.mkdirs(path);
- });
-
- // Asserting that path created by azcopy becomes explicit.
- Assertions.assertThat(DirectoryStateHelper.isExplicitDirectory(implicitPath,
- fs, getTestTracingContext(fs, true)))
- .describedAs("Path created by azcopy did not become explicit")
- .isTrue();
+ // Creating a directory on existing file inside an implicit directory
+ // Asserting that the mkdir fails
+ LambdaTestUtils.intercept(FileAlreadyExistsException.class, () -> {
+ fs.mkdirs(path);
+ });
- // Asserting that the file still exists at path.
- Assertions.assertThat(DirectoryStateHelper.isExplicitDirectory(path,
- fs, getTestTracingContext(fs, true)))
- .describedAs("File still exists at path")
- .isFalse();
+ // Asserting that the file still exists at path.
+ Assertions.assertThat(DirectoryStateHelper.isExplicitDirectory(path,
+ fs, getTestTracingContext(fs, true)))
+ .describedAs("File still exists at path")
+ .isFalse();
+ }
}
/**
@@ -1457,35 +1961,40 @@ fs, getTestTracingContext(fs, true)))
@Test
public void testImplicitExplicitFolder() throws Exception {
Configuration configuration = Mockito.spy(getRawConfiguration());
- final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(configuration);
- final Path implicitPath = new Path("a/b/c");
+ try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(configuration)) {
+ final Path implicitPath = new Path("a/b/c");
- createAzCopyFolder(implicitPath);
+ createAzCopyFolder(implicitPath);
- Path path = makeQualified(new Path("a/b"));
- AbfsBlobClient blobClient = (AbfsBlobClient) fs.getAbfsStore().getClient(AbfsServiceType.BLOB);
- blobClient.createPath(path.toUri().getPath(), false, true,
- null, false, null, null, getTestTracingContext(fs, true),
- true);
+ Path path = makeQualified(new Path("a/b"));
+ AbfsBlobClient blobClient = (AbfsBlobClient) fs.getAbfsStore()
+ .getClient(AbfsServiceType.BLOB);
+ blobClient.createPathRestOp(path.toUri().getPath(), false, true,
+ false, null, null, getTestTracingContext(fs, true));
- fs.mkdirs(new Path("a/b/c/d"));
+ fs.mkdirs(new Path("a/b/c/d"));
- Assertions.assertThat(DirectoryStateHelper.isImplicitDirectory(new Path("a"),
- fs, getTestTracingContext(fs, true)))
- .describedAs("Directory 'a' should be implicit")
- .isTrue();
- Assertions.assertThat(DirectoryStateHelper.isExplicitDirectory(new Path("a/b"),
- fs, getTestTracingContext(fs, true)))
- .describedAs("Directory 'a/b' should be explicit")
- .isTrue();
- Assertions.assertThat(DirectoryStateHelper.isExplicitDirectory(new Path("a/b/c"),
- fs, getTestTracingContext(fs, true)))
- .describedAs("Directory 'a/b/c' should be explicit")
- .isTrue();
- Assertions.assertThat(DirectoryStateHelper.isExplicitDirectory(new Path("a/b/c/d"),
- fs, getTestTracingContext(fs, true)))
- .describedAs("Directory 'a/b/c/d' should be explicit")
- .isTrue();
+ Assertions.assertThat(
+ DirectoryStateHelper.isImplicitDirectory(new Path("a"),
+ fs, getTestTracingContext(fs, true)))
+ .describedAs("Directory 'a' should be implicit")
+ .isTrue();
+ Assertions.assertThat(
+ DirectoryStateHelper.isExplicitDirectory(new Path("a/b"),
+ fs, getTestTracingContext(fs, true)))
+ .describedAs("Directory 'a/b' should be explicit")
+ .isTrue();
+ Assertions.assertThat(
+ DirectoryStateHelper.isExplicitDirectory(new Path("a/b/c"),
+ fs, getTestTracingContext(fs, true)))
+ .describedAs("Directory 'a/b/c' should be explicit")
+ .isTrue();
+ Assertions.assertThat(
+ DirectoryStateHelper.isExplicitDirectory(new Path("a/b/c/d"),
+ fs, getTestTracingContext(fs, true)))
+ .describedAs("Directory 'a/b/c/d' should be explicit")
+ .isTrue();
+ }
}
/**
@@ -1497,40 +2006,46 @@ fs, getTestTracingContext(fs, true)))
@Test
public void testImplicitExplicitFolder1() throws Exception {
Configuration configuration = Mockito.spy(getRawConfiguration());
- final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(configuration);
- final Path implicitPath = new Path("a/b/c");
+ try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(configuration)) {
+ final Path implicitPath = new Path("a/b/c");
- createAzCopyFolder(implicitPath);
+ createAzCopyFolder(implicitPath);
- Path path = makeQualified(new Path("a"));
- AbfsBlobClient blobClient = (AbfsBlobClient) fs.getAbfsStore().getClient(AbfsServiceType.BLOB);
- blobClient.createPath(path.toUri().getPath(), false, true,
- null, false, null, null, getTestTracingContext(fs, true), true);
+ Path path = makeQualified(new Path("a"));
+ AbfsBlobClient blobClient = (AbfsBlobClient) fs.getAbfsStore()
+ .getClient(AbfsServiceType.BLOB);
+ blobClient.createPathRestOp(path.toUri().getPath(), false, true, false,
+ null, null, getTestTracingContext(fs, true));
- Path newPath = makeQualified(new Path("a/b/c"));
- blobClient.createPath(newPath.toUri().getPath(), false, true,
- null, false, null, null, getTestTracingContext(fs, true), true);
+ Path newPath = makeQualified(new Path("a/b/c"));
+ blobClient.createPathRestOp(newPath.toUri().getPath(), false, true,
+ false, null, null, getTestTracingContext(fs, true));
- fs.mkdirs(new Path("a/b/c/d"));
+ fs.mkdirs(new Path("a/b/c/d"));
- Assertions.assertThat(DirectoryStateHelper.isImplicitDirectory(new Path("a/b"),
- fs, getTestTracingContext(fs, true)))
- .describedAs("Directory 'a/b' should be implicit")
- .isTrue();
+ Assertions.assertThat(
+ DirectoryStateHelper.isImplicitDirectory(new Path("a/b"),
+ fs, getTestTracingContext(fs, true)))
+ .describedAs("Directory 'a/b' should be implicit")
+ .isTrue();
- // Asserting that the directory created by mkdir exists as explicit.
- Assertions.assertThat(DirectoryStateHelper.isExplicitDirectory(new Path("a"),
- fs, getTestTracingContext(fs, true)))
- .describedAs("Directory 'a' should be explicit")
- .isTrue();
- Assertions.assertThat(DirectoryStateHelper.isExplicitDirectory(new Path("a/b/c"),
- fs, getTestTracingContext(fs, true)))
- .describedAs("Directory 'a/b/c' should be explicit")
- .isTrue();
- Assertions.assertThat(DirectoryStateHelper.isExplicitDirectory(new Path("a/b/c/d"),
- fs, getTestTracingContext(fs, true)))
- .describedAs("Directory 'a/b/c/d' should be explicit")
- .isTrue();
+ // Asserting that the directory created by mkdir exists as explicit.
+ Assertions.assertThat(
+ DirectoryStateHelper.isExplicitDirectory(new Path("a"),
+ fs, getTestTracingContext(fs, true)))
+ .describedAs("Directory 'a' should be explicit")
+ .isTrue();
+ Assertions.assertThat(
+ DirectoryStateHelper.isExplicitDirectory(new Path("a/b/c"),
+ fs, getTestTracingContext(fs, true)))
+ .describedAs("Directory 'a/b/c' should be explicit")
+ .isTrue();
+ Assertions.assertThat(
+ DirectoryStateHelper.isExplicitDirectory(new Path("a/b/c/d"),
+ fs, getTestTracingContext(fs, true)))
+ .describedAs("Directory 'a/b/c/d' should be explicit")
+ .isTrue();
+ }
}
/**
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java
index 2a407b90947a3..af1e9e8496dc1 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java
@@ -60,7 +60,6 @@
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_LEASE_EXPIRED;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_NO_LEASE_ID_SPECIFIED;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_NO_LEASE_THREADS;
-import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_PARALLEL_ACCESS_DETECTED;
/**
* Test lease operations.
@@ -71,6 +70,8 @@ public class ITestAzureBlobFileSystemLease extends AbstractAbfsIntegrationTest {
private static final String TEST_FILE = "testfile";
private final boolean isHNSEnabled;
private static final int TEST_BYTES = 20;
+ private static final String PARALLEL_ACCESS = "Parallel access to the create path "
+ + "detected";
public ITestAzureBlobFileSystemLease() throws Exception {
super();
@@ -151,14 +152,15 @@ public void testTwoCreate() throws Exception {
fs.mkdirs(testFilePath.getParent());
try (FSDataOutputStream out = fs.create(testFilePath)) {
- LambdaTestUtils.intercept(IOException.class, isHNSEnabled ? ERR_PARALLEL_ACCESS_DETECTED
- : client instanceof AbfsBlobClient
- ? ERR_NO_LEASE_ID_SPECIFIED_BLOB
- : ERR_NO_LEASE_ID_SPECIFIED, () -> {
- try (FSDataOutputStream out2 = fs.create(testFilePath)) {
- }
- return "Expected second create on infinite lease dir to fail";
- });
+ LambdaTestUtils.intercept(IOException.class,
+ isHNSEnabled ? PARALLEL_ACCESS
+ : client instanceof AbfsBlobClient
+ ? ERR_NO_LEASE_ID_SPECIFIED_BLOB
+ : ERR_NO_LEASE_ID_SPECIFIED, () -> {
+ try (FSDataOutputStream out2 = fs.create(testFilePath)) {
+ }
+ return "Expected second create on infinite lease dir to fail";
+ });
}
Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java
index 0c50e279df27a..e54b98e0b7a6e 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java
@@ -145,7 +145,7 @@ public void testCreateDirOverwrite(boolean enableConditionalCreateOverwrite)
// One request to server
if (client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs)) {
- // 1 GetBlobProperties + 1 PutBlob call.
+ // 1 ListBlobs + 1 GetBlobProperties
mkdirRequestCount +=2;
} else {
mkdirRequestCount++;
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java
index 506eae7598668..7fb672920cd67 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java
@@ -112,7 +112,7 @@ public void checkCorrelationConfigValidation(String clientCorrelationId,
//request should not fail for invalid clientCorrelationID
AbfsRestOperation op = fs.getAbfsClient()
- .createPath(path, false, true, permissions, false, null, null,
+ .createPath(path, true, true, permissions, false, null, null,
tracingContext);
int statusCode = op.getResult().getStatusCode();
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
index 6f24015b4ff08..a786b77a7936c 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
@@ -452,8 +452,12 @@ public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance,
(currentAuthType == AuthType.SharedKey)
|| (currentAuthType == AuthType.OAuth));
- // TODO : [FnsOverBlob][HADOOP-19234] Update to work with Blob Endpoint as well when Fns Over Blob is ready.
- AbfsClient client = mock(AbfsDfsClient.class);
+ AbfsClient client;
+ if (AbfsServiceType.DFS.equals(abfsConfig.getFsConfiguredServiceType())) {
+ client = mock(AbfsDfsClient.class);
+ } else {
+ client = mock(AbfsBlobClient.class);
+ }
AbfsPerfTracker tracker = new AbfsPerfTracker(
"test",
abfsConfig.getAccountName(),