Skip to content

Commit

Permalink
[BugFix] Fix retry upload failure when backup files using HDFS filesy…
Browse files Browse the repository at this point in the history
…stem interface (#53679)

Signed-off-by: srlch <[email protected]>
  • Loading branch information
srlch authored Dec 27, 2024
1 parent 28e477e commit 2d76f88
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 8 deletions.
11 changes: 3 additions & 8 deletions be/src/fs/hdfs/fs_hdfs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -571,25 +571,20 @@ StatusOr<std::unique_ptr<WritableFile>> HdfsFileSystem::new_writable_file(const
std::shared_ptr<HdfsFsClient> hdfs_client;
RETURN_IF_ERROR(HdfsFsCache::instance()->get_connection(namenode, hdfs_client, _options));
int flags = O_WRONLY;
if (opts.mode == FileSystem::CREATE_OR_OPEN_WITH_TRUNCATE) {
if (auto st = _path_exists(hdfs_client->hdfs_fs, path); st.ok()) {
return Status::NotSupported(fmt::format("Cannot truncate a file by hdfs writer, path={}", path));
}
} else if (opts.mode == MUST_CREATE) {
// O_WRONLY means create or overwrite for hdfsOpenFile, which is exactly CREATE_OR_OPEN_WITH_TRUNCATE
if (opts.mode == MUST_CREATE) {
if (auto st = _path_exists(hdfs_client->hdfs_fs, path); st.ok()) {
return Status::AlreadyExist(path);
}
} else if (opts.mode == MUST_EXIST) {
return Status::NotSupported("Open with MUST_EXIST not supported by hdfs writer");
} else if (opts.mode == CREATE_OR_OPEN) {
return Status::NotSupported("Open with CREATE_OR_OPEN not supported by hdfs writer");
} else {
} else if (opts.mode != CREATE_OR_OPEN_WITH_TRUNCATE) {
auto msg = strings::Substitute("Unsupported open mode $0", opts.mode);
return Status::NotSupported(msg);
}

flags |= O_CREAT;

// `io.file.buffer.size` of https://apache.github.io/hadoop/hadoop-project-dist/hadoop-common/core-default.xml
int hdfs_write_buffer_size = 0;
// pass zero to hdfsOpenFile will use the default hdfs_write_buffer_size
Expand Down
50 changes: 50 additions & 0 deletions be/test/fs/fs_hdfs_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,54 @@ TEST_F(HdfsFileSystemTest, create_file_and_destroy) {
thread.join();
}

TEST_F(HdfsFileSystemTest, create_file_with_open_truncate) {
auto fs = new_fs_hdfs(FSOptions());
const std::string filepath = "file://" + _root_path + "/create_file_with_open_truncate";
auto st = fs->path_exists(filepath);
EXPECT_TRUE(st.is_not_found());
std::string str1 = "123";
std::string str2 = "456";
Slice data1(str1);
Slice data2(str2);

WritableFileOptions opts{.sync_on_close = false, .mode = FileSystem::CREATE_OR_OPEN_WITH_TRUNCATE};

auto wfile_1 = fs->new_writable_file(opts, filepath);
EXPECT_TRUE(wfile_1.ok());
EXPECT_TRUE((*wfile_1)->append(data1).ok());
EXPECT_TRUE((*wfile_1)->flush(WritableFile::FlushMode::FLUSH_SYNC).ok());
EXPECT_TRUE((*wfile_1)->sync().ok());
(*wfile_1)->close();

st = fs->path_exists(filepath);
EXPECT_TRUE(st.ok());

auto open_res1 = fs->new_random_access_file(filepath);
EXPECT_TRUE(open_res1.ok());
auto rfile1 = std::move(open_res1.value());
auto read_res_1 = rfile1->read_all();
EXPECT_TRUE(read_res_1.ok());
EXPECT_TRUE(read_res_1.value() == str1);

auto wfile_2 = fs->new_writable_file(opts, filepath);
EXPECT_TRUE(wfile_2.ok());
EXPECT_TRUE((*wfile_2)->append(data2).ok());
EXPECT_TRUE((*wfile_2)->flush(WritableFile::FlushMode::FLUSH_SYNC).ok());
EXPECT_TRUE((*wfile_2)->sync().ok());
(*wfile_2)->close();

st = fs->path_exists(filepath);
EXPECT_TRUE(st.ok());

auto open_res2 = fs->new_random_access_file(filepath);
EXPECT_TRUE(open_res2.ok());
auto rfile2 = std::move(open_res2.value());
auto read_res_2 = rfile2->read_all();
EXPECT_TRUE(read_res_2.ok());
EXPECT_TRUE(read_res_2.value() == str2);

(*wfile_1).reset();
(*wfile_2).reset();
}

} // namespace starrocks

0 comments on commit 2d76f88

Please sign in to comment.