From 6f782a6fe458043e0316730357c78036b157448a Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Wed, 12 Feb 2025 09:49:16 -0800 Subject: [PATCH] Fix GPU CI tests by creating 1 cluster across sessions (#540) * Fix GPU CI tests Signed-off-by: Vibhu Jawa * Remove unused imports Signed-off-by: Vibhu Jawa * Skip GPU Cluster creation if pytest is marked as CPU Signed-off-by: Vibhu Jawa --------- Signed-off-by: Vibhu Jawa --- conftest.py | 20 +++++++++++++ tests/test_backends.py | 63 ++++++++++++++++++--------------------- tests/test_classifiers.py | 12 +------- tests/test_fuzzy_dedup.py | 28 ++++++++--------- tests/test_semdedup.py | 11 ++----- 5 files changed, 66 insertions(+), 68 deletions(-) diff --git a/conftest.py b/conftest.py index 451ae5af..7aa3adb0 100644 --- a/conftest.py +++ b/conftest.py @@ -1,4 +1,11 @@ import pytest +from dask.distributed import Client + +from nemo_curator.utils.import_utils import gpu_only_import, gpu_only_import_from + +cudf = gpu_only_import("cudf") +dask_cudf = gpu_only_import("dask_cudf") +LocalCUDACluster = gpu_only_import_from("dask_cuda", "LocalCUDACluster") def pytest_addoption(parser): @@ -13,3 +20,16 @@ def pytest_collection_modifyitems(config, items): for item in items: if "gpu" in item.keywords: item.add_marker(skip_gpu) + + +@pytest.fixture(autouse=True, scope="session") +def gpu_client(request): + if not request.config.getoption("--cpu"): + with LocalCUDACluster(n_workers=1) as cluster, Client(cluster) as client: + request.session.client = client + request.session.cluster = cluster + yield client + client.close() + cluster.close() + else: + yield None diff --git a/tests/test_backends.py b/tests/test_backends.py index 6acd87b5..112bde6f 100644 --- a/tests/test_backends.py +++ b/tests/test_backends.py @@ -14,7 +14,6 @@ import pandas as pd import pytest from dask.dataframe.utils import assert_eq -from distributed import Client from nemo_curator import ( BaseModule, @@ -26,11 +25,10 @@ ) from nemo_curator.datasets import DocumentDataset from nemo_curator.filters import MeanWordLengthFilter -from nemo_curator.utils.import_utils import gpu_only_import, gpu_only_import_from +from nemo_curator.utils.import_utils import gpu_only_import cudf = gpu_only_import("cudf") dask_cudf = gpu_only_import("dask_cudf") -LocalCUDACluster = gpu_only_import_from("dask_cuda", "LocalCUDACluster") class CPUModule(BaseModule): @@ -98,18 +96,12 @@ def gpu_data(raw_data): @pytest.mark.gpu class TestBackendSupport: - @pytest.fixture(autouse=True, scope="class") - def gpu_client(self, request): - with LocalCUDACluster(n_workers=1) as cluster, Client(cluster) as client: - request.cls.client = client - request.cls.cluster = cluster - yield - def test_pandas_backend( self, cpu_data, + gpu_client, ): - print("client", self.client) + print("client", gpu_client) dataset, gt_lengths = cpu_data pipeline = CPUModule() result = pipeline(dataset) @@ -119,8 +111,9 @@ def test_pandas_backend( def test_cudf_backend( self, gpu_data, + gpu_client, ): - print("client", self.client) + print("client", gpu_client) dataset, gt_lengths = gpu_data pipeline = GPUModule() result = pipeline(dataset) @@ -131,8 +124,9 @@ def test_any_backend( self, cpu_data, gpu_data, + gpu_client, ): - print("client", self.client) + print("client", gpu_client) cpu_dataset, gt_cpu_lengths = cpu_data gt_cpu_lengths = gt_cpu_lengths.rename("any_lengths") gpu_dataset, gt_gpu_lengths = gpu_data @@ -150,8 +144,9 @@ def test_pandas_to_cudf( self, cpu_data, gpu_data, + gpu_client, ): - print("client", self.client) + print("client", gpu_client) dataset, gt_cpu_lengths = cpu_data _, gt_gpu_lengths = gpu_data pipeline = Sequential( @@ -170,8 +165,9 @@ def test_cudf_to_pandas( self, cpu_data, gpu_data, + gpu_client, ): - print("client", self.client) + print("client", gpu_client) _, gt_cpu_lengths = cpu_data dataset, gt_gpu_lengths = gpu_data pipeline = Sequential( @@ -190,8 +186,9 @@ def test_5x_switch( self, cpu_data, gpu_data, + gpu_client, ): - print("client", self.client) + print("client", gpu_client) dataset, gt_cpu_lengths = cpu_data _, gt_gpu_lengths = gpu_data pipeline = Sequential( @@ -220,25 +217,25 @@ def test_5x_switch( assert_eq(result_df["cpu_lengths"], gt_cpu_lengths) assert_eq(result_df["gpu_lengths"], gt_gpu_lengths) - def test_wrong_backend_cpu_data(self, cpu_data): + def test_wrong_backend_cpu_data(self, cpu_data, gpu_client): with pytest.raises(ValueError): - print("client", self.client) + print("client", gpu_client) dataset, _ = cpu_data pipeline = GPUModule() result = pipeline(dataset) _ = result.df.compute() - def test_wrong_backend_gpu_data(self, gpu_data): + def test_wrong_backend_gpu_data(self, gpu_data, gpu_client): with pytest.raises(ValueError): - print("client", self.client) + print("client", gpu_client) dataset, _ = gpu_data pipeline = CPUModule() result = pipeline(dataset) _ = result.df.compute() - def test_unsupported_to_backend(self, cpu_data): + def test_unsupported_to_backend(self, cpu_data, gpu_client): with pytest.raises(ValueError): - print("client", self.client) + print("client", gpu_client) dataset, _ = cpu_data pipeline = ToBackend("fake_backend") result = pipeline(dataset) @@ -281,18 +278,12 @@ def real_module_gpu_data(real_module_raw_data): @pytest.mark.gpu class TestRealModules: - @pytest.fixture(autouse=True, scope="class") - def gpu_client(self, request): - with LocalCUDACluster(n_workers=1) as cluster, Client(cluster) as client: - request.cls.client = client - request.cls.cluster = cluster - yield - def test_score_filter( self, real_module_cpu_data, + gpu_client, ): - print("client", self.client) + print("client", gpu_client) dataset, gt_results = real_module_cpu_data pipeline = ScoreFilter( MeanWordLengthFilter(), score_field="mean_lengths", score_type=float @@ -304,9 +295,10 @@ def test_score_filter( def test_score_filter_wrong_backend( self, real_module_gpu_data, + gpu_client, ): with pytest.raises(ValueError): - print("client", self.client) + print("client", gpu_client) dataset, _ = real_module_gpu_data pipeline = ScoreFilter( MeanWordLengthFilter(), score_field="mean_lengths", score_type=float @@ -318,8 +310,9 @@ def test_fuzzy_dedup( self, real_module_gpu_data, tmpdir, + gpu_client, ): - print(self.client) + print(gpu_client) dataset, gt_results = real_module_gpu_data # Dedup might fail when indices per partition do not start from 0 dataset.df = dataset.df.reset_index(drop=True) @@ -355,9 +348,10 @@ def test_fuzzy_dedup_wrong_backend( self, real_module_cpu_data, tmpdir, + gpu_client, ): with pytest.raises(ValueError): - print(self.client) + print(gpu_client) dataset, _ = real_module_cpu_data # Dedup might fail when indices per partition do not start from 0 dataset.df = dataset.df.reset_index(drop=True) @@ -384,8 +378,9 @@ def test_score_filter_and_fuzzy( real_module_cpu_data, real_module_gpu_data, tmpdir, + gpu_client, ): - print("client", self.client) + print("client", gpu_client) dataset, _ = real_module_cpu_data _, gt_results = real_module_gpu_data dataset.df = dataset.df.reset_index(drop=True) diff --git a/tests/test_classifiers.py b/tests/test_classifiers.py index 1d37e7f5..388c1ebd 100644 --- a/tests/test_classifiers.py +++ b/tests/test_classifiers.py @@ -13,22 +13,12 @@ # limitations under the License. import pytest -from distributed import Client from nemo_curator.datasets import DocumentDataset -from nemo_curator.utils.import_utils import gpu_only_import, gpu_only_import_from +from nemo_curator.utils.import_utils import gpu_only_import cudf = gpu_only_import("cudf") dask_cudf = gpu_only_import("dask_cudf") -LocalCUDACluster = gpu_only_import_from("dask_cuda", "LocalCUDACluster") - - -@pytest.fixture -def gpu_client(request): - with LocalCUDACluster(n_workers=1) as cluster, Client(cluster) as client: - request.client = client - request.cluster = cluster - yield @pytest.fixture diff --git a/tests/test_fuzzy_dedup.py b/tests/test_fuzzy_dedup.py index bb9810ce..53fbb102 100644 --- a/tests/test_fuzzy_dedup.py +++ b/tests/test_fuzzy_dedup.py @@ -22,16 +22,14 @@ import yaml from dask import config from dask.dataframe.utils import assert_eq -from distributed import Client from nemo_curator import LSH, FuzzyDuplicates, FuzzyDuplicatesConfig, MinHash from nemo_curator.datasets import DocumentDataset from nemo_curator.utils.fuzzy_dedup_utils.merge_utils import extract_partitioning_index -from nemo_curator.utils.import_utils import gpu_only_import, gpu_only_import_from +from nemo_curator.utils.import_utils import gpu_only_import cudf = gpu_only_import("cudf") dask_cudf = gpu_only_import("dask_cudf") -LocalCUDACluster = gpu_only_import_from("dask_cuda", "LocalCUDACluster") @pytest.fixture @@ -303,13 +301,6 @@ def test_partial_overlap(self, tmpdir, false_positive_check): @pytest.mark.gpu class TestFuzzyDuplicates: - @pytest.fixture(autouse=True, scope="class") - def gpu_client(self, request): - with LocalCUDACluster(n_workers=1) as cluster, Client(cluster) as client: - request.cls.client = client - request.cls.cluster = cluster - yield - @pytest.mark.parametrize("use_64_bit_hash", [False, True]) @pytest.mark.parametrize( "num_buckets,jaccard_threshold,duplicate_docs", @@ -328,8 +319,9 @@ def test_fuzzy_dedup( jaccard_threshold, duplicate_docs, tmpdir, + gpu_client, ): - print(self.client) + print(gpu_client) # Dedup might fail when indices per partition do not start from 0 fuzzy_dedup_data.df = fuzzy_dedup_data.df.reset_index(drop=True) config = FuzzyDuplicatesConfig( @@ -408,8 +400,9 @@ def test_different_fields(self, fuzzy_dedup_data, tmpdir): def test_non_uniform_indices( self, tmpdir, + gpu_client, ): - print(self.client) + print(gpu_client) # Dedup might fail when indices per partition do not start from 0 df = cudf.DataFrame( { @@ -498,7 +491,13 @@ def test_num_anchors(self, large_fuzzy_dedup_data, num_anchors, tmpdir): ], ) def test_no_fp_check( - self, fuzzy_dedup_data, use_64_bit_hash, num_buckets, duplicate_docs, tmpdir + self, + fuzzy_dedup_data, + use_64_bit_hash, + num_buckets, + duplicate_docs, + tmpdir, + gpu_client, ): config = FuzzyDuplicatesConfig( cache_dir=tmpdir, @@ -533,6 +532,7 @@ def test_shuffle_fail_fuzzy_dedup_data( self, shuffle_fail_fuzzy_dedup_data, tmpdir, + gpu_client, ): # Dedup might fail when indices per partition do not start from 0 shuffle_fail_fuzzy_dedup_data.df = shuffle_fail_fuzzy_dedup_data.df.reset_index( @@ -569,7 +569,7 @@ def test_shuffle_fail_fuzzy_dedup_data( @pytest.mark.parametrize("false_positive_check", [True, False]) def test_fuzzy_dedup_no_duplicates( - self, no_duplicates_fuzzy_dedup_data, tmpdir, false_positive_check + self, no_duplicates_fuzzy_dedup_data, tmpdir, false_positive_check, gpu_client ): # Dedup might fail when indices per partition do not start from 0 no_duplicates_fuzzy_dedup_data.df = ( diff --git a/tests/test_semdedup.py b/tests/test_semdedup.py index 8ccf850a..44f7f555 100644 --- a/tests/test_semdedup.py +++ b/tests/test_semdedup.py @@ -27,7 +27,6 @@ cudf = gpu_only_import("cudf") dask_cudf = gpu_only_import("dask_cudf") -LocalCUDACluster = gpu_only_import_from("dask_cuda", "LocalCUDACluster") EmbeddingCreator = gpu_only_import_from( "nemo_curator.modules.semantic_dedup.embeddings", "EmbeddingCreator" ) @@ -55,19 +54,13 @@ def dedup_data(): @pytest.mark.gpu class TestSemDuplicates: - @pytest.fixture(autouse=True, scope="class") - def gpu_client(self, request): - with LocalCUDACluster(n_workers=1) as cluster, Client(cluster) as client: - request.cls.client = client - request.cls.cluster = cluster - yield - def test_sem_dedup( self, dedup_data, tmpdir, + gpu_client, ): - print("client", self.client) + print("client", gpu_client) cache_dir = os.path.join(tmpdir, "test_sem_dedup_cache") config = SemDedupConfig( cache_dir=cache_dir,