diff --git a/be/src/fs/hdfs/fs_hdfs.cpp b/be/src/fs/hdfs/fs_hdfs.cpp index c448a322fc7a45..52ddba2693a3de 100644 --- a/be/src/fs/hdfs/fs_hdfs.cpp +++ b/be/src/fs/hdfs/fs_hdfs.cpp @@ -571,11 +571,8 @@ StatusOr> HdfsFileSystem::new_writable_file(const std::shared_ptr hdfs_client; RETURN_IF_ERROR(HdfsFsCache::instance()->get_connection(namenode, hdfs_client, _options)); int flags = O_WRONLY; - if (opts.mode == FileSystem::CREATE_OR_OPEN_WITH_TRUNCATE) { - if (auto st = _path_exists(hdfs_client->hdfs_fs, path); st.ok()) { - return Status::NotSupported(fmt::format("Cannot truncate a file by hdfs writer, path={}", path)); - } - } else if (opts.mode == MUST_CREATE) { + // O_WRONLY means create or overwrite for hdfsOpenFile, which is exactly CREATE_OR_OPEN_WITH_TRUNCATE + if (opts.mode == MUST_CREATE) { if (auto st = _path_exists(hdfs_client->hdfs_fs, path); st.ok()) { return Status::AlreadyExist(path); } @@ -583,13 +580,11 @@ StatusOr> HdfsFileSystem::new_writable_file(const return Status::NotSupported("Open with MUST_EXIST not supported by hdfs writer"); } else if (opts.mode == CREATE_OR_OPEN) { return Status::NotSupported("Open with CREATE_OR_OPEN not supported by hdfs writer"); - } else { + } else if (opts.mode != CREATE_OR_OPEN_WITH_TRUNCATE) { auto msg = strings::Substitute("Unsupported open mode $0", opts.mode); return Status::NotSupported(msg); } - flags |= O_CREAT; - // `io.file.buffer.size` of https://apache.github.io/hadoop/hadoop-project-dist/hadoop-common/core-default.xml int hdfs_write_buffer_size = 0; // pass zero to hdfsOpenFile will use the default hdfs_write_buffer_size diff --git a/be/test/fs/fs_hdfs_test.cpp b/be/test/fs/fs_hdfs_test.cpp index d05cf0aa8dca49..3716717f3b80ec 100644 --- a/be/test/fs/fs_hdfs_test.cpp +++ b/be/test/fs/fs_hdfs_test.cpp @@ -76,4 +76,54 @@ TEST_F(HdfsFileSystemTest, create_file_and_destroy) { thread.join(); } +TEST_F(HdfsFileSystemTest, create_file_with_open_truncate) { + auto fs = new_fs_hdfs(FSOptions()); + const std::string filepath = "file://" + _root_path + "/create_file_with_open_truncate"; + auto st = fs->path_exists(filepath); + EXPECT_TRUE(st.is_not_found()); + std::string str1 = "123"; + std::string str2 = "456"; + Slice data1(str1); + Slice data2(str2); + + WritableFileOptions opts{.sync_on_close = false, .mode = FileSystem::CREATE_OR_OPEN_WITH_TRUNCATE}; + + auto wfile_1 = fs->new_writable_file(opts, filepath); + EXPECT_TRUE(wfile_1.ok()); + EXPECT_TRUE((*wfile_1)->append(data1).ok()); + EXPECT_TRUE((*wfile_1)->flush(WritableFile::FlushMode::FLUSH_SYNC).ok()); + EXPECT_TRUE((*wfile_1)->sync().ok()); + (*wfile_1)->close(); + + st = fs->path_exists(filepath); + EXPECT_TRUE(st.ok()); + + auto open_res1 = fs->new_random_access_file(filepath); + EXPECT_TRUE(open_res1.ok()); + auto rfile1 = std::move(open_res1.value()); + auto read_res_1 = rfile1->read_all(); + EXPECT_TRUE(read_res_1.ok()); + EXPECT_TRUE(read_res_1.value() == str1); + + auto wfile_2 = fs->new_writable_file(opts, filepath); + EXPECT_TRUE(wfile_2.ok()); + EXPECT_TRUE((*wfile_2)->append(data2).ok()); + EXPECT_TRUE((*wfile_2)->flush(WritableFile::FlushMode::FLUSH_SYNC).ok()); + EXPECT_TRUE((*wfile_2)->sync().ok()); + (*wfile_2)->close(); + + st = fs->path_exists(filepath); + EXPECT_TRUE(st.ok()); + + auto open_res2 = fs->new_random_access_file(filepath); + EXPECT_TRUE(open_res2.ok()); + auto rfile2 = std::move(open_res2.value()); + auto read_res_2 = rfile2->read_all(); + EXPECT_TRUE(read_res_2.ok()); + EXPECT_TRUE(read_res_2.value() == str2); + + (*wfile_1).reset(); + (*wfile_2).reset(); +} + } // namespace starrocks