Skip to content

Commit

Permalink
NSFS | versioning | content directory versioning - GET/HEAD/DELETE op…
Browse files Browse the repository at this point in the history
…erations

Signed-off-by: nadav mizrahi <[email protected]>
  • Loading branch information
nadavMiz committed Feb 9, 2025
1 parent d9bfeea commit e7191fb
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 45 deletions.
112 changes: 72 additions & 40 deletions src/sdk/namespace_fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -575,9 +575,9 @@ class NamespaceFS {
* @param {string} file_path
* @param {object} err
*/
_should_update_issues_report(params, file_path, err) {
async _should_update_issues_report(fs_context, params, file_path, err) {
const { key } = params;
const md_file_path = this._get_file_md_path({ key });
const md_file_path = await this._get_file_md_path(fs_context, { key });
const non_internal_path = file_path === md_file_path;
const no_such_key_condition = err.code === `ENOENT` && non_internal_path;
return !no_such_key_condition;
Expand Down Expand Up @@ -944,8 +944,8 @@ class NamespaceFS {
file_path = await this._find_version_path(fs_context, params, true);
await this._check_path_in_bucket_boundaries(fs_context, file_path);
await this._load_bucket(params, fs_context);
stat = await nb_native().fs.stat(fs_context, file_path);
isDir = native_fs_utils.isDirectory(stat);
stat = await nb_native().fs.stat(fs_context, file_path);
isDir = native_fs_utils.isDirectory(stat);
if (isDir) {
if (!stat.xattr?.[XATTR_DIR_CONTENT] || !params.key.endsWith('/')) {
throw error_utils.new_error_code('ENOENT', 'NoSuchKey');
Expand All @@ -972,7 +972,7 @@ class NamespaceFS {
this._throw_if_delete_marker(stat, params);
return this._get_object_info(params.bucket, params.key, stat, isDir);
} catch (err) {
if (this._should_update_issues_report(params, file_path, err)) {
if (await this._should_update_issues_report(fs_context, params, file_path, err)) {
this.run_update_issues_report(object_sdk, err);
}
throw native_fs_utils.translate_error_codes(err, native_fs_utils.entity_enum.OBJECT);
Expand All @@ -983,7 +983,7 @@ class NamespaceFS {
const is_dir_content = this._is_directory_content(file_path, params.key);
if (is_dir_content) {
try {
const md_path = this._get_file_md_path(params);
const md_path = await this._get_file_md_path(fs_context, params);
const dir_stat = await nb_native().fs.stat(fs_context, md_path);
if (dir_stat && dir_stat.xattr[XATTR_DIR_CONTENT] === '0') return true;
} catch (err) {
Expand All @@ -1008,20 +1008,20 @@ class NamespaceFS {
let stat;
for (;;) {
try {
file_path = await this._find_version_path(fs_context, params);
await this._check_path_in_bucket_boundaries(fs_context, file_path);
file_path = await this._find_version_path(fs_context, params);
await this._check_path_in_bucket_boundaries(fs_context, file_path);

// NOTE: don't move this code after the open
// this can lead to ENOENT failures due to file not exists when content size is 0
// if entry is a directory object and its content size = 0 - return empty response
// NOTE: don't move this code after the open
// this can lead to ENOENT failures due to file not exists when content size is 0
// if entry is a directory object and its content size = 0 - return empty response
if (await this._is_empty_directory_content(file_path, fs_context, params)) return null;

file = await nb_native().fs.open(
fs_context,
file_path,
config.NSFS_OPEN_READ_MODE,
native_fs_utils.get_umasked_mode(config.BASE_MODE_FILE),
);
file = await nb_native().fs.open(
fs_context,
file_path,
config.NSFS_OPEN_READ_MODE,
native_fs_utils.get_umasked_mode(config.BASE_MODE_FILE),
);
stat = await file.stat(fs_context);
if (this._is_mismatch_version_id(stat, params.version_id)) {
dbg.warn('NamespaceFS.read_object_stream mismatch version_id', params.version_id, this._get_version_id_by_xattr(stat));
Expand Down Expand Up @@ -1420,7 +1420,7 @@ class NamespaceFS {
await this._assign_dir_content_to_xattr(fs_context, fs_xattr, params, copy_xattr);
// when .folder exist and it's no upload flow - .folder should be deleted if it exists
await native_fs_utils.unlink_ignore_enoent(fs_context, file_path);
const dir_path = this._get_file_md_path(params);
const dir_path = this._get_directory_path(params);
const stat = await nb_native().fs.stat(fs_context, dir_path);
const upload_info = this._get_upload_info(stat, fs_xattr[XATTR_VERSION_ID]);
return upload_info;
Expand Down Expand Up @@ -1968,10 +1968,8 @@ class NamespaceFS {
await this._check_path_in_bucket_boundaries(fs_context, file_path);
dbg.log0('NamespaceFS: delete_object', file_path);
let res;
const is_key_dir_path = await this._is_key_dir_path(fs_context, params.key);
if (this._is_versioning_disabled() || is_key_dir_path) {
if (this._is_versioning_disabled() || this.should_use_empty_content_dir_optimization()) {
// TODO- Directory object (key/) is currently can't co-exist while key (without slash) exists. see -https://github.com/noobaa/noobaa-core/issues/8320
// Also, Directory object (key/) is currently not supported combined with versioning - see - https://github.com/noobaa/noobaa-core/issues/8531
await this._delete_single_object(fs_context, file_path, params);
} else {
res = params.version_id ?
Expand Down Expand Up @@ -2032,7 +2030,7 @@ class NamespaceFS {
// when deleting the data of a directory object, we need to remove the directory dir object xattr
// if the dir still exists - occurs when deleting dir while the dir still has entries in it
if (this._is_directory_content(file_path, params.key)) {
await this._clear_user_xattr(fs_context, this._get_file_md_path(params), XATTR_USER_PREFIX);
await this._clear_user_xattr(fs_context, await this._get_file_md_path(fs_context, params), XATTR_USER_PREFIX);
}
}

Expand Down Expand Up @@ -2064,7 +2062,7 @@ class NamespaceFS {
if (params.version_id) {
file_path = await this._find_version_path(fs_context, params, true);
} else {
file_path = this._get_file_md_path(params);
file_path = await this._get_file_md_path(fs_context, params);
}
try {
dbg.log0('NamespaceFS.get_object_tagging: param ', params, 'file_path :', file_path);
Expand Down Expand Up @@ -2282,11 +2280,19 @@ class NamespaceFS {
return p.endsWith('/') ? p + config.NSFS_FOLDER_OBJECT_NAME : p;
}

_get_file_md_path({ key }) {
async _get_file_md_path(fs_context, { key }) {
const p = this._get_file_path({ key });
// when the key refers to a directory (trailing /) with the disabled format (xattr are set on the dir)
// we want to return the actual directory path
return (await this._is_disabled_content_dir(fs_context, p, key)) ?
path.join(path.dirname(p), '/') : p;
}

_get_directory_path({ key }) {
const p = this._get_file_path({ key });
// when the key refers to a directory (trailing /) but we would like to return the md path
// we return the parent directory of .folder
return this._is_directory_content(p, key) ? path.join(path.dirname(p), '/') : p;
return path.join(path.dirname(p), '/');
}

_assign_md5_to_fs_xattr(md5_digest, fs_xattr) {
Expand Down Expand Up @@ -2366,7 +2372,7 @@ class NamespaceFS {
* existing xattr starting with XATTR_USER_PREFIX will be cleared
*/
async _assign_dir_content_to_xattr(fs_context, fs_xattr, params, copy_xattr) {
const dir_path = this._get_file_md_path(params);
const dir_path = this._get_directory_path(params);
fs_xattr = Object.assign(fs_xattr || {}, {
[XATTR_DIR_CONTENT]: params.size || 0
});
Expand All @@ -2385,6 +2391,20 @@ class NamespaceFS {
return (file_path && file_path.endsWith(config.NSFS_FOLDER_OBJECT_NAME)) && (key && key.endsWith('/'));
}

/**
*
* @param {*} fs_context
* @param {string} file_path
* @returns {Promise<boolean>}
*/
async _is_disabled_content_dir(fs_context, file_path, key) {
if (this._is_directory_content(file_path, key)) {
const stat = await native_fs_utils.stat_ignore_enoent(fs_context, path.join(path.dirname(file_path), '/'));
return Boolean(stat?.xattr[XATTR_DIR_CONTENT]);
}
return false;
}

/**
* @param {string} dir_key
* @param {fs.Dirent} ent
Expand Down Expand Up @@ -2816,27 +2836,28 @@ class NamespaceFS {
/**
* _find_version_path returns the path of the version
* 1. if version_id is not defined, it returns the key file
* 2. else,
* 2. else,
* 2.1. check version format
* 2.2. check if the latest version exists and it matches the version_id parameter the latest version path returns
* 2.3. else, return the version path under .versions/
* @param {import('./nb').NativeFSContext} fs_context
* 2.3. else, return the version path under .versions/
* @param {import('./nb').NativeFSContext} fs_context
* @param {{key: string, version_id?: string}} params
* @param {boolean} [return_md_path]
* @param {boolean} [return_md_path]
* @returns {Promise<string>}
*/
async _find_version_path(fs_context, { key, version_id }, return_md_path) {
const cur_ver_path = return_md_path ? this._get_file_md_path({ key }) : this._get_file_path({ key });
const cur_ver_path = return_md_path ? await this._get_file_md_path(fs_context, { key }) : this._get_file_path({ key });
if (!version_id) return cur_ver_path;

this._throw_if_wrong_version_format(version_id);
const cur_ver_info = await this._get_version_info(fs_context, cur_ver_path);
if (cur_ver_info && cur_ver_info.version_id_str === version_id) return cur_ver_path;

const versioned_path = this._get_version_path(key, version_id);
const isDir = this._is_directory_content(cur_ver_path, key);
const versioned_path = this._get_version_path(key, version_id, isDir);
return versioned_path;
}
/**
/**
* _is_key_dir_path will check if key is pointing to a directory or a file
* @param {nb.NativeFSContext} fs_context
* @param {string} key
Expand Down Expand Up @@ -2937,6 +2958,9 @@ class NamespaceFS {
await native_fs_utils.unlink_ignore_enoent(fs_context, file_path);
await this._check_version_moved(fs_context, key, version_id);
}
if (await this._is_disabled_content_dir(fs_context, latest_version_path, key)) {
await this._clear_user_xattr(fs_context, path.join(path.dirname(latest_version_path), '/'), XATTR_DIR_CONTENT);
}
return version_info;
} catch (err) {
dbg.warn(`NamespaceFS._delete_single_object_versioned error: retries=${retries} file_path=${file_path}`, err);
Expand Down Expand Up @@ -3083,9 +3107,11 @@ class NamespaceFS {
dbg.log0('Namespace_fs._delete_latest_version:', latest_ver_path, params);

let gpfs_options;
const is_dir = this._is_directory_content(latest_ver_path, params.key);
const is_gpfs = native_fs_utils._is_gpfs(fs_context);
let retries = config.NSFS_RENAME_RETRIES;
let latest_ver_info;
let versioned_path;
for (;;) {
try {
// TODO get latest version from file in POSIX like in GPFS path
Expand All @@ -3099,7 +3125,7 @@ class NamespaceFS {
latest_ver_info = latest_fd && await this._get_version_info(fs_context, undefined, latest_fd);
if (!latest_ver_info) break;
}
const versioned_path = this._get_version_path(params.key, latest_ver_info.version_id_str);
versioned_path = this._get_version_path(params.key, latest_ver_info.version_id_str, is_dir);

const suspended_and_latest_is_not_null = this._is_versioning_suspended() &&
latest_ver_info.version_id_str !== NULL_VERSION_ID;
Expand All @@ -3119,6 +3145,9 @@ class NamespaceFS {
gpfs_options?.delete_version, bucket_tmp_dir_path);
}
}
if (is_dir) {
await this._handle_latest_disabled_dir_content_xattr(fs_context, params.key, versioned_path, latest_ver_path);
}
break;
} catch (err) {
dbg.warn(`NamespaceFS._delete_latest_version error: retries=${retries} latest_ver_path=${latest_ver_path}`, err);
Expand All @@ -3130,7 +3159,7 @@ class NamespaceFS {
}
}
// create delete marker and move it to .versions/key_{delete_marker_version_id}
const created_version_id = await this._create_delete_marker(fs_context, params, latest_ver_info);
const created_version_id = await this._create_delete_marker(fs_context, params, latest_ver_info, is_dir);
return {
created_delete_marker: true,
created_version_id
Expand Down Expand Up @@ -3169,7 +3198,7 @@ class NamespaceFS {
}

// TODO: support GPFS
async _create_delete_marker(fs_context, params, deleted_version_info) {
async _create_delete_marker(fs_context, params, deleted_version_info, is_dir) {
let retries = config.NSFS_RENAME_RETRIES;
let upload_params;
let delete_marker_version_id;
Expand All @@ -3185,7 +3214,7 @@ class NamespaceFS {
// the delete marker file name would be with a 'null' suffix
delete_marker_version_id = NULL_VERSION_ID;
}
const file_path = this._get_version_path(params.key, delete_marker_version_id);
const file_path = this._get_version_path(params.key, delete_marker_version_id, is_dir);

const fs_xattr = await this._assign_versions_to_fs_xattr(stat, undefined, true);
if (fs_xattr) await upload_params.target_file.replacexattr(fs_context, fs_xattr);
Expand All @@ -3210,14 +3239,17 @@ class NamespaceFS {

// try find prev version by hint or by iterating on .versions/ dir
async find_max_version_past(fs_context, key) {
const versions_dir = path.normalize(path.join(this.bucket_path, path.dirname(key), HIDDEN_VERSIONS_PATH));
const is_dir = await this._is_key_dir_path(fs_context, key);
const key_name = is_dir ? config.NSFS_FOLDER_OBJECT_NAME : path.basename(key);
const dir_name = is_dir ? key : path.dirname(key);
const versions_dir = path.normalize(path.join(this.bucket_path, dir_name, HIDDEN_VERSIONS_PATH));
try {
const versions = await nb_native().fs.readdir(fs_context, versions_dir);
const arr = await P.map_with_concurrency(10, versions, async entry => {
const index = entry.name.endsWith('_null') ? entry.name.lastIndexOf('_null') : entry.name.lastIndexOf('_mtime-');
// don't fail if version entry name is invalid, just keep searching
if (index < 0 || entry.name.slice(0, index) !== key) return undefined;
const { mtimeNsBigint } = this._extract_version_info_from_xattr(entry.name.slice(key.length + 1)) ||
if (index < 0 || entry.name.slice(0, index) !== key_name) return undefined;
const { mtimeNsBigint } = this._extract_version_info_from_xattr(entry.name.slice(key_name.length + 1)) ||
(await this._get_version_info(fs_context, path.join(versions_dir, entry.name)));
return { mtimeNsBigint, name: entry.name };
});
Expand Down
Loading

0 comments on commit e7191fb

Please sign in to comment.