From 6c9a0d0830f7f21761f500d47c27e9838edea01e Mon Sep 17 00:00:00 2001
From: "github-actions[bot]"
 <41898282+github-actions[bot]@users.noreply.github.com>
Date: Tue, 14 Jan 2025 17:09:09 +0800
Subject: [PATCH] [#6131] feat (gvfs-fuse): Add  integration test framework of
 gvfs-fuse (#6225)

### What changes were proposed in this pull request?

Add integration test framework of gvfs-fuse
Integrate LocalStack into the gvfs-fuse integration test
Add ci pipeline for integration test

### Why are the changes needed?

Fix: #6131

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

IT

Co-authored-by: Yuhui <hui@datastrato.com>
---
 .github/workflows/gvfs-fuse-build-test.yml    |  14 ++-
 clients/filesystem-fuse/Makefile              |   6 +
 .../src/default_raw_filesystem.rs             |  15 ++-
 .../filesystem-fuse/src/gravitino_client.rs   |  26 +++-
 .../src/gravitino_fileset_filesystem.rs       |  81 +++++++++++-
 clients/filesystem-fuse/src/gvfs_creator.rs   |  10 +-
 clients/filesystem-fuse/src/lib.rs            |  13 ++
 .../src/open_dal_filesystem.rs                |  47 +++++--
 clients/filesystem-fuse/src/s3_filesystem.rs  | 113 +++++++++--------
 clients/filesystem-fuse/tests/bin/env.sh      |  65 ++++++++++
 .../tests/bin/gravitino_server.sh             | 116 ++++++++++++++++++
 .../filesystem-fuse/tests/bin/gvfs_fuse.sh    |  65 ++++++++++
 .../filesystem-fuse/tests/bin/localstatck.sh  |  46 +++++++
 .../tests/bin/run_fuse_testers.sh             |  70 +++++++++++
 .../tests/bin/run_s3fs_testers.sh             |  64 ++++++++++
 .../tests/conf/gvfs_fuse_s3.toml              |   3 +-
 clients/filesystem-fuse/tests/fuse_test.rs    |  22 ++--
 17 files changed, 696 insertions(+), 80 deletions(-)
 create mode 100644 clients/filesystem-fuse/tests/bin/env.sh
 create mode 100644 clients/filesystem-fuse/tests/bin/gravitino_server.sh
 create mode 100644 clients/filesystem-fuse/tests/bin/gvfs_fuse.sh
 create mode 100644 clients/filesystem-fuse/tests/bin/localstatck.sh
 create mode 100755 clients/filesystem-fuse/tests/bin/run_fuse_testers.sh
 create mode 100644 clients/filesystem-fuse/tests/bin/run_s3fs_testers.sh

diff --git a/.github/workflows/gvfs-fuse-build-test.yml b/.github/workflows/gvfs-fuse-build-test.yml
index 4af01d82da3..4fe7b66e09d 100644
--- a/.github/workflows/gvfs-fuse-build-test.yml
+++ b/.github/workflows/gvfs-fuse-build-test.yml
@@ -71,10 +71,18 @@ jobs:
         run: |
           dev/ci/check_commands.sh
 
-      - name: Build and test Gravitino
+      - name: Build Gvfs-fuse
         run: |
            ./gradlew :clients:filesystem-fuse:build -PenableFuse=true
 
+      - name: Integration test
+        run: |
+          ./gradlew build -x :clients:client-python:build -x test -x web -PjdkVersion=${{ matrix.java-version }}
+          ./gradlew compileDistribution -x :clients:client-python:build -x test -x web -PjdkVersion=${{ matrix.java-version }}
+          cd clients/filesystem-fuse
+          make test-s3
+          make test-fuse-it
+
       - name: Free up disk space
         run: |
           dev/ci/util_free_space.sh
@@ -85,5 +93,7 @@ jobs:
         with:
           name: Gvfs-fuse integrate-test-reports-${{ matrix.java-version }}
           path: |
-            clients/filesystem-fuse/build/test/log/*.log
+            clients/filesystem-fuse/target/debug/fuse.log
+            distribution/package/logs/gravitino-server.out
+            distribution/package/logs/gravitino-server.log
 
diff --git a/clients/filesystem-fuse/Makefile b/clients/filesystem-fuse/Makefile
index f4a4cef20ae..86dd2f22152 100644
--- a/clients/filesystem-fuse/Makefile
+++ b/clients/filesystem-fuse/Makefile
@@ -62,6 +62,12 @@ doc-test:
 unit-test: doc-test
 	cargo test --no-fail-fast --lib --all-features --workspace
 
+test-fuse-it:
+	@bash ./tests/bin/run_fuse_testers.sh test
+
+test-s3:
+	@bash ./tests/bin/run_s3fs_testers.sh test
+
 test: doc-test
 	cargo test --no-fail-fast --all-targets --all-features --workspace
 
diff --git a/clients/filesystem-fuse/src/default_raw_filesystem.rs b/clients/filesystem-fuse/src/default_raw_filesystem.rs
index 944181246d5..d1d8e7605df 100644
--- a/clients/filesystem-fuse/src/default_raw_filesystem.rs
+++ b/clients/filesystem-fuse/src/default_raw_filesystem.rs
@@ -334,13 +334,22 @@ impl<T: PathFileSystem> RawFileSystem for DefaultRawFileSystem<T> {
         file.flush().await
     }
 
-    async fn close_file(&self, _file_id: u64, fh: u64) -> Result<()> {
+    async fn close_file(&self, file_id: u64, fh: u64) -> Result<()> {
+        let file_entry = self.get_file_entry(file_id).await;
+
         let opened_file = self
             .opened_file_manager
             .remove(fh)
             .ok_or(Errno::from(libc::EBADF))?;
-        let mut file = opened_file.lock().await;
-        file.close().await
+
+        // todo: need to handle racing condition and corner case when the file has been deleted.
+        if file_entry.is_ok() {
+            let mut file = opened_file.lock().await;
+            file.close().await
+        } else {
+            // If the file has been deleted, it does not cause a leak even if it has not been closed.
+            Ok(())
+        }
     }
 
     async fn read(&self, file_id: u64, fh: u64, offset: u64, size: u32) -> Result<Bytes> {
diff --git a/clients/filesystem-fuse/src/gravitino_client.rs b/clients/filesystem-fuse/src/gravitino_client.rs
index 9bdfbb2c288..1e1cd411eac 100644
--- a/clients/filesystem-fuse/src/gravitino_client.rs
+++ b/clients/filesystem-fuse/src/gravitino_client.rs
@@ -199,10 +199,34 @@ impl GravitinoClient {
 }
 
 #[cfg(test)]
-mod tests {
+pub(crate) mod tests {
     use super::*;
     use mockito::mock;
 
+    pub(crate) fn create_test_catalog(
+        name: &str,
+        provider: &str,
+        properties: HashMap<String, String>,
+    ) -> Catalog {
+        Catalog {
+            name: name.to_string(),
+            catalog_type: "fileset".to_string(),
+            provider: provider.to_string(),
+            comment: "".to_string(),
+            properties: properties,
+        }
+    }
+
+    pub(crate) fn create_test_fileset(name: &str, storage_location: &str) -> Fileset {
+        Fileset {
+            name: name.to_string(),
+            fileset_type: "managed".to_string(),
+            comment: "".to_string(),
+            storage_location: storage_location.to_string(),
+            properties: HashMap::default(),
+        }
+    }
+
     #[tokio::test]
     async fn test_get_fileset_success() {
         let fileset_response = r#"
diff --git a/clients/filesystem-fuse/src/gravitino_fileset_filesystem.rs b/clients/filesystem-fuse/src/gravitino_fileset_filesystem.rs
index 7da2f572dcc..04236dfe841 100644
--- a/clients/filesystem-fuse/src/gravitino_fileset_filesystem.rs
+++ b/clients/filesystem-fuse/src/gravitino_fileset_filesystem.rs
@@ -140,16 +140,27 @@ impl PathFileSystem for GravitinoFilesetFileSystem {
 
 #[cfg(test)]
 mod tests {
-    use crate::config::GravitinoConfig;
+    use crate::config::{AppConfig, GravitinoConfig};
+    use crate::default_raw_filesystem::DefaultRawFileSystem;
+    use crate::filesystem::tests::{TestPathFileSystem, TestRawFileSystem};
+    use crate::filesystem::{FileSystemContext, PathFileSystem, RawFileSystem};
+    use crate::gravitino_client::tests::{create_test_catalog, create_test_fileset};
+    use crate::gravitino_client::GravitinoClient;
     use crate::gravitino_fileset_filesystem::GravitinoFilesetFileSystem;
+    use crate::gvfs_creator::create_fs_with_fileset;
     use crate::memory_filesystem::MemoryFileSystem;
+    use crate::s3_filesystem::extract_s3_config;
+    use crate::s3_filesystem::tests::{cleanup_s3_fs, s3_test_config};
+    use crate::test_enable_with;
+    use crate::RUN_TEST_WITH_S3;
+    use std::collections::HashMap;
     use std::path::Path;
 
     #[tokio::test]
     async fn test_map_fileset_path_to_raw_path() {
         let fs = GravitinoFilesetFileSystem {
             physical_fs: Box::new(MemoryFileSystem::new().await),
-            client: super::GravitinoClient::new(&GravitinoConfig::default()),
+            client: GravitinoClient::new(&GravitinoConfig::default()),
             location: "/c1/fileset1".into(),
         };
         let path = fs.gvfs_path_to_raw_path(Path::new("/a"));
@@ -162,7 +173,7 @@ mod tests {
     async fn test_map_raw_path_to_fileset_path() {
         let fs = GravitinoFilesetFileSystem {
             physical_fs: Box::new(MemoryFileSystem::new().await),
-            client: super::GravitinoClient::new(&GravitinoConfig::default()),
+            client: GravitinoClient::new(&GravitinoConfig::default()),
             location: "/c1/fileset1".into(),
         };
         let path = fs
@@ -172,4 +183,68 @@ mod tests {
         let path = fs.raw_path_to_gvfs_path(Path::new("/c1/fileset1")).unwrap();
         assert_eq!(path, Path::new("/"));
     }
+
+    async fn create_fileset_fs(path: &Path, config: &AppConfig) -> GravitinoFilesetFileSystem {
+        let opendal_config = extract_s3_config(config);
+
+        cleanup_s3_fs(path, &opendal_config).await;
+
+        let bucket = opendal_config.get("bucket").expect("Bucket must exist");
+        let endpoint = opendal_config.get("endpoint").expect("Endpoint must exist");
+
+        let catalog = create_test_catalog(
+            "c1",
+            "s3",
+            vec![
+                ("location".to_string(), format!("s3a://{}", bucket)),
+                ("s3-endpoint".to_string(), endpoint.to_string()),
+            ]
+            .into_iter()
+            .collect::<HashMap<String, String>>(),
+        );
+        let file_set_location = format!("s3a://{}{}", bucket, path.to_string_lossy());
+        let file_set = create_test_fileset("fileset1", &file_set_location);
+
+        let fs_context = FileSystemContext::default();
+        let inner_fs = create_fs_with_fileset(&catalog, &file_set, config, &fs_context)
+            .await
+            .unwrap();
+        GravitinoFilesetFileSystem::new(
+            inner_fs,
+            path,
+            GravitinoClient::new(&config.gravitino),
+            config,
+            &fs_context,
+        )
+        .await
+    }
+
+    #[tokio::test]
+    async fn s3_ut_test_fileset_file_system() {
+        test_enable_with!(RUN_TEST_WITH_S3);
+
+        let config = s3_test_config();
+        let cwd = Path::new("/gvfs_test3");
+        let fs = create_fileset_fs(cwd, &config).await;
+        let _ = fs.init().await;
+        let mut tester = TestPathFileSystem::new(Path::new("/"), fs);
+        tester.test_path_file_system().await;
+    }
+
+    #[tokio::test]
+    async fn s3_ut_test_fileset_with_raw_file_system() {
+        test_enable_with!(RUN_TEST_WITH_S3);
+
+        let config = s3_test_config();
+        let cwd = Path::new("/gvfs_test4");
+        let fileset_fs = create_fileset_fs(cwd, &config).await;
+        let raw_fs = DefaultRawFileSystem::new(
+            fileset_fs,
+            &AppConfig::default(),
+            &FileSystemContext::default(),
+        );
+        let _ = raw_fs.init().await;
+        let mut tester = TestRawFileSystem::new(Path::new("/"), raw_fs);
+        tester.test_raw_file_system().await;
+    }
 }
diff --git a/clients/filesystem-fuse/src/gvfs_creator.rs b/clients/filesystem-fuse/src/gvfs_creator.rs
index aac88ad9d08..88bc8a1b422 100644
--- a/clients/filesystem-fuse/src/gvfs_creator.rs
+++ b/clients/filesystem-fuse/src/gvfs_creator.rs
@@ -87,7 +87,7 @@ pub async fn create_gvfs_filesystem(
         .get_fileset(&catalog_name, &schema_name, &fileset_name)
         .await?;
 
-    let inner_fs = create_fs_with_fileset(&catalog, &fileset, config, fs_context)?;
+    let inner_fs = create_fs_with_fileset(&catalog, &fileset, config, fs_context).await?;
 
     let target_path = extract_root_path(fileset.storage_location.as_str())?;
     let fs =
@@ -95,7 +95,7 @@ pub async fn create_gvfs_filesystem(
     Ok(CreateFileSystemResult::Gvfs(fs))
 }
 
-fn create_fs_with_fileset(
+pub(crate) async fn create_fs_with_fileset(
     catalog: &Catalog,
     fileset: &Fileset,
     config: &AppConfig,
@@ -104,9 +104,9 @@ fn create_fs_with_fileset(
     let schema = extract_filesystem_scheme(&fileset.storage_location)?;
 
     match schema {
-        FileSystemSchema::S3 => Ok(Box::new(S3FileSystem::new(
-            catalog, fileset, config, fs_context,
-        )?)),
+        FileSystemSchema::S3 => Ok(Box::new(
+            S3FileSystem::new(catalog, fileset, config, fs_context).await?,
+        )),
     }
 }
 
diff --git a/clients/filesystem-fuse/src/lib.rs b/clients/filesystem-fuse/src/lib.rs
index 31e7c7fd8e1..41a9a5335d5 100644
--- a/clients/filesystem-fuse/src/lib.rs
+++ b/clients/filesystem-fuse/src/lib.rs
@@ -36,6 +36,19 @@ mod opened_file_manager;
 mod s3_filesystem;
 mod utils;
 
+#[macro_export]
+macro_rules! test_enable_with {
+    ($env_var:expr) => {
+        if std::env::var($env_var).is_err() {
+            println!("Test skipped because {} is not set", $env_var);
+            return;
+        }
+    };
+}
+
+pub const RUN_TEST_WITH_S3: &str = "RUN_TEST_WITH_S3";
+pub const RUN_TEST_WITH_FUSE: &str = "RUN_TEST_WITH_FUSE";
+
 pub async fn gvfs_mount(mount_to: &str, mount_from: &str, config: &AppConfig) -> GvfsResult<()> {
     gvfs_fuse::mount(mount_to, mount_from, config).await
 }
diff --git a/clients/filesystem-fuse/src/open_dal_filesystem.rs b/clients/filesystem-fuse/src/open_dal_filesystem.rs
index e53fbaf6032..d32b014d1f0 100644
--- a/clients/filesystem-fuse/src/open_dal_filesystem.rs
+++ b/clients/filesystem-fuse/src/open_dal_filesystem.rs
@@ -261,22 +261,29 @@ fn opendal_filemode_to_filetype(mode: EntryMode) -> FileType {
 mod test {
     use crate::config::AppConfig;
     use crate::s3_filesystem::extract_s3_config;
+    use crate::s3_filesystem::tests::s3_test_config;
+    use crate::test_enable_with;
+    use crate::RUN_TEST_WITH_S3;
     use opendal::layers::LoggingLayer;
     use opendal::{services, Builder, Operator};
 
-    #[tokio::test]
-    async fn test_s3_stat() {
-        let config = AppConfig::from_file(Some("tests/conf/gvfs_fuse_s3.toml")).unwrap();
-        let opendal_config = extract_s3_config(&config);
-
+    fn create_opendal(config: &AppConfig) -> Operator {
+        let opendal_config = extract_s3_config(config);
         let builder = services::S3::from_map(opendal_config);
 
         // Init an operator
-        let op = Operator::new(builder)
+        Operator::new(builder)
             .expect("opendal create failed")
             .layer(LoggingLayer::default())
-            .finish();
+            .finish()
+    }
+
+    #[tokio::test]
+    async fn s3_ut_test_s3_stat() {
+        test_enable_with!(RUN_TEST_WITH_S3);
 
+        let config = s3_test_config();
+        let op = create_opendal(&config);
         let path = "/";
         let list = op.list(path).await;
         if let Ok(l) = list {
@@ -294,4 +301,30 @@ mod test {
             println!("stat error: {:?}", meta.err());
         }
     }
+
+    #[tokio::test]
+    async fn s3_ut_test_s3_delete() {
+        test_enable_with!(RUN_TEST_WITH_S3);
+        let config = s3_test_config();
+
+        let op = create_opendal(&config);
+        let path = "/s1/fileset1/gvfs_test/test_dir/test_file";
+
+        let meta = op.stat(path).await;
+        if let Ok(m) = meta {
+            println!("stat result: {:?}", m);
+        } else {
+            println!("stat error: {:?}", meta.err());
+        }
+
+        let result = op.remove(vec![path.to_string()]).await;
+        match result {
+            Ok(_) => {
+                println!("Delete successful (or no-op).");
+            }
+            Err(e) => {
+                println!("Delete failed: {:?}", e);
+            }
+        }
+    }
 }
diff --git a/clients/filesystem-fuse/src/s3_filesystem.rs b/clients/filesystem-fuse/src/s3_filesystem.rs
index e0ca69b4ccf..35a091b3fe1 100644
--- a/clients/filesystem-fuse/src/s3_filesystem.rs
+++ b/clients/filesystem-fuse/src/s3_filesystem.rs
@@ -40,7 +40,7 @@ impl S3FileSystem {}
 impl S3FileSystem {
     const S3_CONFIG_PREFIX: &'static str = "s3-";
 
-    pub(crate) fn new(
+    pub(crate) async fn new(
         catalog: &Catalog,
         fileset: &Fileset,
         config: &AppConfig,
@@ -48,10 +48,20 @@ impl S3FileSystem {
     ) -> GvfsResult<Self> {
         let mut opendal_config = extract_s3_config(config);
         let bucket = extract_bucket(&fileset.storage_location)?;
-        opendal_config.insert("bucket".to_string(), bucket);
+        opendal_config.insert("bucket".to_string(), bucket.to_string());
 
-        let region = Self::get_s3_region(catalog)?;
-        opendal_config.insert("region".to_string(), region);
+        let endpoint = catalog.properties.get("s3-endpoint");
+        if endpoint.is_none() {
+            return Err(InvalidConfig.to_error("s3-endpoint is required".to_string()));
+        }
+        let endpoint = endpoint.unwrap();
+        opendal_config.insert("endpoint".to_string(), endpoint.clone());
+
+        let region = Self::get_s3_region(catalog, &bucket).await;
+        if region.is_none() {
+            return Err(InvalidConfig.to_error("s3-region is required".to_string()));
+        }
+        opendal_config.insert("region".to_string(), region.unwrap());
 
         let builder = S3::from_map(opendal_config);
 
@@ -67,16 +77,13 @@ impl S3FileSystem {
         })
     }
 
-    fn get_s3_region(catalog: &Catalog) -> GvfsResult<String> {
+    async fn get_s3_region(catalog: &Catalog, bucket: &str) -> Option<String> {
         if let Some(region) = catalog.properties.get("s3-region") {
-            Ok(region.clone())
+            Some(region.clone())
         } else if let Some(endpoint) = catalog.properties.get("s3-endpoint") {
-            extract_region(endpoint)
+            S3::detect_region(endpoint, bucket).await
         } else {
-            Err(InvalidConfig.to_error(format!(
-                "Cant not retrieve region in the Catalog {}",
-                catalog.name
-            )))
+            None
         }
     }
 }
@@ -139,25 +146,11 @@ pub(crate) fn extract_bucket(location: &str) -> GvfsResult<String> {
     }
 }
 
-pub(crate) fn extract_region(location: &str) -> GvfsResult<String> {
-    let url = parse_location(location)?;
-    match url.host_str() {
-        Some(host) => {
-            let parts: Vec<&str> = host.split('.').collect();
-            if parts.len() > 1 {
-                Ok(parts[1].to_string())
-            } else {
-                Err(InvalidConfig.to_error(format!(
-                    "Invalid location: expected region in host, got {}",
-                    location
-                )))
-            }
-        }
-        None => Err(InvalidConfig.to_error(format!(
-            "Invalid fileset location without bucket: {}",
-            location
-        ))),
-    }
+pub(crate) fn extract_region(location: &str) -> Option<String> {
+    parse_location(location).ok().and_then(|url| {
+        url.host_str()
+            .and_then(|host| host.split('.').nth(1).map(|part| part.to_string()))
+    })
 }
 
 pub fn extract_s3_config(config: &AppConfig) -> HashMap<String, String> {
@@ -181,11 +174,13 @@ pub fn extract_s3_config(config: &AppConfig) -> HashMap<String, String> {
 }
 
 #[cfg(test)]
-mod tests {
+pub(crate) mod tests {
     use super::*;
     use crate::default_raw_filesystem::DefaultRawFileSystem;
     use crate::filesystem::tests::{TestPathFileSystem, TestRawFileSystem};
     use crate::filesystem::RawFileSystem;
+    use crate::test_enable_with;
+    use crate::RUN_TEST_WITH_S3;
     use opendal::layers::TimeoutLayer;
     use std::time::Duration;
 
@@ -201,11 +196,11 @@ mod tests {
     fn test_extract_region() {
         let location = "http://s3.ap-southeast-2.amazonaws.com";
         let result = extract_region(location);
-        assert!(result.is_ok());
+        assert!(result.is_some());
         assert_eq!(result.unwrap(), "ap-southeast-2");
     }
 
-    async fn delete_dir(op: &Operator, dir_name: &str) {
+    pub(crate) async fn delete_dir(op: &Operator, dir_name: &str) {
         let childs = op.list(dir_name).await.expect("list dir failed");
         for child in childs {
             let child_name = dir_name.to_string() + child.name();
@@ -218,13 +213,11 @@ mod tests {
         op.delete(dir_name).await.expect("delete dir failed");
     }
 
-    async fn create_s3_fs(cwd: &Path) -> S3FileSystem {
-        let config = AppConfig::from_file(Some("tests/conf/gvfs_fuse_s3.toml")).unwrap();
-        let opendal_config = extract_s3_config(&config);
-
-        let fs_context = FileSystemContext::default();
-
-        let builder = S3::from_map(opendal_config);
+    pub(crate) async fn cleanup_s3_fs(
+        cwd: &Path,
+        opendal_config: &HashMap<String, String>,
+    ) -> Operator {
+        let builder = S3::from_map(opendal_config.clone());
         let op = Operator::new(builder)
             .expect("opendal create failed")
             .layer(LoggingLayer::default())
@@ -241,18 +234,37 @@ mod tests {
         op.create_dir(&file_name)
             .await
             .expect("create test dir failed");
+        op
+    }
+
+    async fn create_s3_fs(cwd: &Path, config: &AppConfig) -> S3FileSystem {
+        let opendal_config = extract_s3_config(config);
+        let op = cleanup_s3_fs(cwd, &opendal_config).await;
+
+        let fs_context = FileSystemContext::default();
+        let open_dal_fs = OpenDalFileSystem::new(op, config, &fs_context);
 
-        let open_dal_fs = OpenDalFileSystem::new(op, &config, &fs_context);
         S3FileSystem { open_dal_fs }
     }
 
-    #[tokio::test]
-    async fn test_s3_file_system() {
-        if std::env::var("RUN_S3_TESTS").is_err() {
-            return;
+    pub(crate) fn s3_test_config() -> AppConfig {
+        let mut config_file_name = "target/conf/gvfs_fuse_s3.toml";
+        let source_file_name = "tests/conf/gvfs_fuse_s3.toml";
+
+        if !Path::new(config_file_name).exists() {
+            config_file_name = source_file_name;
         }
+
+        AppConfig::from_file(Some(config_file_name)).unwrap()
+    }
+
+    #[tokio::test]
+    async fn s3_ut_test_s3_file_system() {
+        test_enable_with!(RUN_TEST_WITH_S3);
+
+        let config = s3_test_config();
         let cwd = Path::new("/gvfs_test1");
-        let fs = create_s3_fs(cwd).await;
+        let fs = create_s3_fs(cwd, &config).await;
 
         let _ = fs.init().await;
         let mut tester = TestPathFileSystem::new(cwd, fs);
@@ -260,13 +272,12 @@ mod tests {
     }
 
     #[tokio::test]
-    async fn test_s3_file_system_with_raw_file_system() {
-        if std::env::var("RUN_S3_TESTS").is_err() {
-            return;
-        }
+    async fn s3_ut_test_s3_file_system_with_raw_file_system() {
+        test_enable_with!(RUN_TEST_WITH_S3);
 
+        let config = s3_test_config();
         let cwd = Path::new("/gvfs_test2");
-        let s3_fs = create_s3_fs(cwd).await;
+        let s3_fs = create_s3_fs(cwd, &config).await;
         let raw_fs =
             DefaultRawFileSystem::new(s3_fs, &AppConfig::default(), &FileSystemContext::default());
         let _ = raw_fs.init().await;
diff --git a/clients/filesystem-fuse/tests/bin/env.sh b/clients/filesystem-fuse/tests/bin/env.sh
new file mode 100644
index 00000000000..c2e0b23be05
--- /dev/null
+++ b/clients/filesystem-fuse/tests/bin/env.sh
@@ -0,0 +1,65 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
+cd "$SCRIPT_DIR"
+
+S3_ACCESS_KEY_ID=${S3_ACCESS_KEY_ID:-test}
+S3_SECRET_ACCESS=${S3_SECRET_ACCESS:-test}
+S3_REGION=${S3_REGION:-ap-southeast-2}
+S3_BUCKET=${S3_BUCKET:-my-bucket}
+S3_ENDPOINT=${S3_ENDPOINT:-http://127.0.0.1:4566}
+
+# Check required environment variables
+if [[ -z "$S3_ACCESS_KEY_ID" || -z "$S3_SECRET_ACCESS" || -z "$S3_REGION" || -z "$S3_BUCKET" || -z "$S3_ENDPOINT" ]]; then
+  echo "Error: One or more required S3 environment variables are not set."
+  echo "Please set: S3_ACCESS_KEY_ID, S3_SECRET_ACCESS, S3_REGION, S3_BUCKET, S3_ENDPOINT."
+  exit 1
+fi
+
+DISABLE_LOCALSTACK=${DISABLE_LOCALSTACK:-0}
+# if S3 endpoint is not default value. disable localstack
+if [[ "$S3_ENDPOINT" != "http://127.0.0.1:4566" ]]; then
+  echo "AWS S3 endpoint detected, disabling localstack"
+  DISABLE_LOCALSTACK=1
+fi
+
+GRAVITINO_HOME=../../../..
+GRAVITINO_HOME=$(cd $GRAVITINO_HOME && pwd)
+GRAVITINO_SERVER_DIR=$GRAVITINO_HOME/distribution/package
+CLIENT_FUSE_DIR=$GRAVITINO_HOME/clients/filesystem-fuse
+
+generate_test_config() {
+  local config_dir
+  config_dir=$(dirname "$TEST_CONFIG_FILE")
+  mkdir -p "$config_dir"
+
+  awk -v access_key="$S3_ACCESS_KEY_ID" \
+      -v secret_key="$S3_SECRET_ACCESS" \
+      -v region="$S3_REGION" \
+      -v bucket="$S3_BUCKET" \
+      -v endpoint="$S3_ENDPOINT" \
+      'BEGIN { in_extend_config = 0 }
+      /^\[extend_config\]/ { in_extend_config = 1 }
+      in_extend_config && /s3-access_key_id/ { $0 = "s3-access_key_id = \"" access_key "\"" }
+      in_extend_config && /s3-secret_access_key/ { $0 = "s3-secret_access_key = \"" secret_key "\"" }
+      in_extend_config && /s3-region/ { $0 = "s3-region = \"" region "\"" }
+      in_extend_config && /s3-bucket/ { $0 = "s3-bucket = \"" bucket "\"" }
+      in_extend_config && /s3-endpoint/ { $0 = "s3-endpoint = \"" endpoint "\"" }
+      { print }' $CLIENT_FUSE_DIR/tests/conf/gvfs_fuse_s3.toml > "$TEST_CONFIG_FILE"
+}
diff --git a/clients/filesystem-fuse/tests/bin/gravitino_server.sh b/clients/filesystem-fuse/tests/bin/gravitino_server.sh
new file mode 100644
index 00000000000..0f9b0fdab98
--- /dev/null
+++ b/clients/filesystem-fuse/tests/bin/gravitino_server.sh
@@ -0,0 +1,116 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+GRAVITINO_SERVER_URL="http://localhost:8090"
+
+check_gravitino_server_ready() {
+  local url=$1
+  local retries=10  # Number of retries
+  local wait_time=1 # Wait time between retries (seconds)
+
+  for ((i=1; i<=retries; i++)); do
+    if curl --silent --head --fail "$url/api/metalakes" >/dev/null; then
+      echo "Gravitino server is ready."
+      return 0
+    else
+      echo "Attempt $i/$retries: Server not ready. Retrying in $wait_time seconds..."
+      sleep "$wait_time"
+    fi
+  done
+
+  echo "Error: Gravitino server did not become ready after $((retries * wait_time)) seconds."
+  exit 1
+}
+
+create_resource() {
+  local url=$1
+  local data=$2
+
+  response=$(curl -s -w "\n%{http_code}" -X POST -H "Accept: application/vnd.gravitino.v1+json" \
+    -H "Content-Type: application/json" -d "$data" "$url")
+
+  body=$(echo "$response" | head -n -1)
+  response_code=$(echo "$response" | tail -n 1)
+
+  # Check if the response code is not 2xx
+  if [[ "$response_code" -lt 200 || "$response_code" -ge 300 ]]; then
+    echo "Error: Failed to create resource. Status code: $response_code"
+    echo "Response body: $body"
+    exit 1
+  fi
+}
+
+
+
+start_gravitino_server() {
+  echo "Starting Gravitino Server"
+  # copy the aws-bundle to the server
+  if ls $GRAVITINO_SERVER_DIR/catalogs/hadoop/libs/gravitino-aws-bundle-*-incubating-SNAPSHOT.jar 1>/dev/null 2>&1; then
+     echo "File exists, skipping copy."
+  else
+    echo "Copying the aws-bundle to the server"
+    cp $GRAVITINO_HOME/bundles/aws-bundle/build/libs/gravitino-aws-bundle-*-incubating-SNAPSHOT.jar \
+      $GRAVITINO_SERVER_DIR/catalogs/hadoop/libs
+  fi
+
+  rm -rf $GRAVITINO_SERVER_DIR/data
+  $GRAVITINO_SERVER_DIR/bin/gravitino.sh restart
+
+  check_gravitino_server_ready $GRAVITINO_SERVER_URL
+
+  # Create metalake
+  create_resource "$GRAVITINO_SERVER_URL/api/metalakes" '{
+    "name":"test",
+    "comment":"comment",
+    "properties":{}
+  }'
+
+  # Create catalog
+  create_resource "$GRAVITINO_SERVER_URL/api/metalakes/test/catalogs" '{
+    "name": "c1",
+    "type": "FILESET",
+    "comment": "comment",
+    "provider": "hadoop",
+    "properties": {
+      "location": "s3a://'"$S3_BUCKET"'",
+      "s3-access-key-id": "'"$S3_ACCESS_KEY_ID"'",
+      "s3-secret-access-key": "'"$S3_SECRET_ACCESS"'",
+      "s3-endpoint": "'"$S3_ENDPOINT"'",
+      "filesystem-providers": "s3"
+    }
+  }'
+
+  # Create schema
+  create_resource "$GRAVITINO_SERVER_URL/api/metalakes/test/catalogs/c1/schemas" '{
+    "name":"s1",
+    "comment":"comment",
+    "properties":{}
+  }'
+
+  # Create FILESET
+  create_resource "$GRAVITINO_SERVER_URL/api/metalakes/test/catalogs/c1/schemas/s1/filesets" '{
+    "name":"fileset1",
+    "comment":"comment",
+    "properties":{}
+  }'
+}
+
+stop_gravitino_server() {
+  $GRAVITINO_SERVER_DIR/bin/gravitino.sh stop
+  echo "Gravitino Server stopped"
+}
\ No newline at end of file
diff --git a/clients/filesystem-fuse/tests/bin/gvfs_fuse.sh b/clients/filesystem-fuse/tests/bin/gvfs_fuse.sh
new file mode 100644
index 00000000000..e706d8e2c0d
--- /dev/null
+++ b/clients/filesystem-fuse/tests/bin/gvfs_fuse.sh
@@ -0,0 +1,65 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+check_gvfs_fuse_ready() {
+  local retries=10
+  local wait_time=1
+
+  for ((i=1; i<=retries; i++)); do
+    # check the $MOUNT_DIR/.gvfs_meta is exist
+    if [ -f "$MOUNT_DIR/.gvfs_meta" ]; then
+      echo "Gvfs fuse is ready."
+      return 0
+    else
+      echo "Attempt $i/$retries: Gvfs fuse not ready. Retrying in $wait_time seconds..."
+      sleep "$wait_time"
+    fi
+  done
+
+  echo "Error: Gvfs fuse did not become ready after $((retries * wait_time)) seconds."
+  tail -n 100 $CLIENT_FUSE_DIR/target/debug/fuse.log
+  exit 1
+}
+
+start_gvfs_fuse() {
+  MOUNT_DIR=$CLIENT_FUSE_DIR/target/gvfs
+
+  umount $MOUNT_DIR > /dev/null 2>&1 || true
+  if [ ! -d "$MOUNT_DIR" ]; then
+    echo "Create the mount point"
+    mkdir -p $MOUNT_DIR
+  fi
+
+  MOUNT_FROM_LOCATION=gvfs://fileset/test/c1/s1/fileset1
+
+  # Build the gvfs-fuse
+  cd $CLIENT_FUSE_DIR
+  make build
+
+  echo "Starting gvfs-fuse-daemon"
+  $CLIENT_FUSE_DIR/target/debug/gvfs-fuse $MOUNT_DIR $MOUNT_FROM_LOCATION $TEST_CONFIG_FILE > \
+    $CLIENT_FUSE_DIR/target/debug/fuse.log 2>&1 &
+  check_gvfs_fuse_ready
+  cd -
+}
+
+stop_gvfs_fuse() {
+  # Stop the gvfs-fuse process if it's running
+  pkill -INT gvfs-fuse || true
+  echo "Stopping gvfs-fuse-daemon"
+}
\ No newline at end of file
diff --git a/clients/filesystem-fuse/tests/bin/localstatck.sh b/clients/filesystem-fuse/tests/bin/localstatck.sh
new file mode 100644
index 00000000000..fa4552d48a3
--- /dev/null
+++ b/clients/filesystem-fuse/tests/bin/localstatck.sh
@@ -0,0 +1,46 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+start_localstack() {
+if [ "$DISABLE_LOCALSTACK" -eq 1 ]; then
+  return
+fi
+
+  echo "Starting localstack..."
+  docker run -d -p 4566:4566 -p 4571:4571 --name localstack localstack/localstack
+  echo "Localstack started"
+
+  docker exec localstack sh -c "\
+    aws configure set aws_access_key_id $S3_ACCESS_KEY_ID && \
+    aws configure set aws_secret_access_key $S3_SECRET_ACCESS && \
+    aws configure set region $S3_REGION && \
+    aws configure set output json"
+
+  docker exec localstack awslocal s3 mb s3://$S3_BUCKET
+}
+
+stop_localstack() {
+if [ "$DISABLE_LOCALSTACK" -eq 1 ]; then
+  return
+fi
+
+  echo "Stopping localstack..."
+  docker stop localstack 2>/dev/null || true
+  docker rm localstack 2>/dev/null || true
+  echo "Localstack stopped"
+}
\ No newline at end of file
diff --git a/clients/filesystem-fuse/tests/bin/run_fuse_testers.sh b/clients/filesystem-fuse/tests/bin/run_fuse_testers.sh
new file mode 100755
index 00000000000..6dc38c48f07
--- /dev/null
+++ b/clients/filesystem-fuse/tests/bin/run_fuse_testers.sh
@@ -0,0 +1,70 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+set -euo pipefail
+
+SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
+cd "$SCRIPT_DIR"
+
+source ./env.sh
+source ./gravitino_server.sh
+source ./gvfs_fuse.sh
+source ./localstatck.sh
+
+TEST_CONFIG_FILE=$CLIENT_FUSE_DIR/target/debug/gvfs-fuse.toml
+
+start_servers() {
+  start_localstack
+  start_gravitino_server
+  generate_test_config
+  start_gvfs_fuse
+}
+
+stop_servers() {
+  set +e
+  stop_gvfs_fuse
+  stop_gravitino_server
+  stop_localstack
+}
+
+# Main logic based on parameters
+if [ "$1" == "test" ]; then
+  trap stop_servers EXIT
+  start_servers
+  # Run the integration test
+  echo "Running tests..."
+  cd $CLIENT_FUSE_DIR
+  export RUN_TEST_WITH_FUSE=1
+  cargo test --test fuse_test fuse_it_
+
+elif [ "$1" == "start" ]; then
+  # Start the servers
+  echo "Starting servers..."
+  start_servers
+
+elif [ "$1" == "stop" ]; then
+  # Stop the servers
+  echo "Stopping servers..."
+  stop_servers
+
+else
+  echo "Usage: $0 {test|start|stop}"
+  exit 1
+fi
+
+
diff --git a/clients/filesystem-fuse/tests/bin/run_s3fs_testers.sh b/clients/filesystem-fuse/tests/bin/run_s3fs_testers.sh
new file mode 100644
index 00000000000..ac5f9812c93
--- /dev/null
+++ b/clients/filesystem-fuse/tests/bin/run_s3fs_testers.sh
@@ -0,0 +1,64 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+set -euo pipefail
+
+SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
+cd "$SCRIPT_DIR"
+
+source ./env.sh
+source ./localstatck.sh
+
+TEST_CONFIG_FILE=$CLIENT_FUSE_DIR/target/conf/gvfs_fuse_s3.toml
+
+start_servers() {
+  start_localstack
+  generate_test_config
+}
+
+stop_servers() {
+  set +e
+  stop_localstack
+}
+
+# Main logic based on parameters
+if [ "$1" == "test" ]; then
+  trap stop_servers EXIT
+  start_servers
+  # Run the integration test
+  echo "Running tests..."
+  cd $CLIENT_FUSE_DIR
+  export RUN_TEST_WITH_S3=1
+  cargo test s3_ut_ --lib
+
+elif [ "$1" == "start" ]; then
+  # Start the servers
+  echo "Starting servers..."
+  start_servers
+
+elif [ "$1" == "stop" ]; then
+  # Stop the servers
+  echo "Stopping servers..."
+  stop_servers
+
+else
+  echo "Usage: $0 {test|start|stop}"
+  exit 1
+fi
+
+
diff --git a/clients/filesystem-fuse/tests/conf/gvfs_fuse_s3.toml b/clients/filesystem-fuse/tests/conf/gvfs_fuse_s3.toml
index 7d182cd40df..d0ff8e5ddec 100644
--- a/clients/filesystem-fuse/tests/conf/gvfs_fuse_s3.toml
+++ b/clients/filesystem-fuse/tests/conf/gvfs_fuse_s3.toml
@@ -19,7 +19,7 @@
 [fuse]
 file_mask= 0o600
 dir_mask= 0o700
-fs_type = "memory"
+fs_type = "gvfs"
 
 [fuse.properties]
 key1 = "value1"
@@ -40,4 +40,5 @@ s3-access_key_id = "XXX_access_key"
 s3-secret_access_key = "XXX_secret_key"
 s3-region = "XXX_region"
 s3-bucket = "XXX_bucket"
+s3-endpoint = "XXX_endpoint"
 
diff --git a/clients/filesystem-fuse/tests/fuse_test.rs b/clients/filesystem-fuse/tests/fuse_test.rs
index d06199d782e..41e385c49f1 100644
--- a/clients/filesystem-fuse/tests/fuse_test.rs
+++ b/clients/filesystem-fuse/tests/fuse_test.rs
@@ -19,7 +19,8 @@
 
 use fuse3::Errno;
 use gvfs_fuse::config::AppConfig;
-use gvfs_fuse::{gvfs_mount, gvfs_unmount};
+use gvfs_fuse::RUN_TEST_WITH_FUSE;
+use gvfs_fuse::{gvfs_mount, gvfs_unmount, test_enable_with};
 use log::{error, info};
 use std::fs::File;
 use std::path::Path;
@@ -85,7 +86,7 @@ impl Drop for FuseTest {
 }
 
 #[test]
-fn test_fuse_system_with_auto() {
+fn test_fuse_with_memory_fs() {
     tracing_subscriber::fmt().init();
 
     panic::set_hook(Box::new(|info| {
@@ -106,14 +107,21 @@ fn test_fuse_system_with_auto() {
     test_fuse_filesystem(mount_point);
 }
 
-fn test_fuse_system_with_manual() {
-    test_fuse_filesystem("build/gvfs");
+#[test]
+fn fuse_it_test_fuse() {
+    test_enable_with!(RUN_TEST_WITH_FUSE);
+
+    test_fuse_filesystem("target/gvfs/gvfs_test");
 }
 
 fn test_fuse_filesystem(mount_point: &str) {
     info!("Test startup");
     let base_path = Path::new(mount_point);
 
+    if !file_exists(base_path) {
+        fs::create_dir_all(base_path).expect("Failed to create test dir");
+    }
+
     //test create file
     let test_file = base_path.join("test_create");
     let file = File::create(&test_file).expect("Failed to create file");
@@ -124,12 +132,12 @@ fn test_fuse_filesystem(mount_point: &str) {
     fs::write(&test_file, "read test").expect("Failed to write file");
 
     //test read file
-    let content = fs::read_to_string(test_file.clone()).expect("Failed to read file");
+    let content = fs::read_to_string(&test_file).expect("Failed to read file");
     assert_eq!(content, "read test", "File content mismatch");
 
     //test delete file
-    fs::remove_file(test_file.clone()).expect("Failed to delete file");
-    assert!(!file_exists(test_file));
+    fs::remove_file(&test_file).expect("Failed to delete file");
+    assert!(!file_exists(&test_file));
 
     //test create directory
     let test_dir = base_path.join("test_dir");