From 76ff35377df7beaddff00a079ddda1628e5c1268 Mon Sep 17 00:00:00 2001 From: Praateek Mahajan Date: Fri, 27 Sep 2024 14:27:15 -0700 Subject: [PATCH 1/6] add enable_cudf_spill to LocalCudaCluster Signed-off-by: Praateek Mahajan --- nemo_curator/utils/distributed_utils.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index dabda543..fb979a8d 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -21,7 +21,7 @@ import warnings from contextlib import nullcontext from pathlib import Path -from typing import Dict, List, Union +from typing import TYPE_CHECKING, Dict, List, Union import dask.dataframe as dd import numpy as np @@ -33,7 +33,13 @@ from nemo_curator.utils.import_utils import gpu_only_import, gpu_only_import_from cudf = gpu_only_import("cudf") -LocalCUDACluster = gpu_only_import_from("dask_cuda", "LocalCUDACluster") + + +if TYPE_CHECKING: + from dask_cuda import LocalCUDACluster +else: + LocalCUDACluster = gpu_only_import_from("dask_cuda", "LocalCUDACluster") + get_device_total_memory = gpu_only_import_from( "dask_cuda.utils", "get_device_total_memory" ) @@ -70,14 +76,11 @@ def start_dask_gpu_local_cluster( rmm_pool_size=rmm_pool_size, protocol=protocol, rmm_async=True, + enable_cudf_spill=True, **extra_kwargs, ) client = Client(cluster) - if enable_spilling: - _enable_spilling() - client.run(_enable_spilling) - if set_torch_to_use_rmm: _set_torch_to_use_rmm() client.run(_set_torch_to_use_rmm) From 08fb5baeef3aae3e03b01381b041be5c991fbbfd Mon Sep 17 00:00:00 2001 From: Praateek Mahajan Date: Fri, 27 Sep 2024 14:41:02 -0700 Subject: [PATCH 2/6] remove typecheck Signed-off-by: Praateek Mahajan --- nemo_curator/utils/distributed_utils.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index fb979a8d..2d1857a3 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -33,13 +33,7 @@ from nemo_curator.utils.import_utils import gpu_only_import, gpu_only_import_from cudf = gpu_only_import("cudf") - - -if TYPE_CHECKING: - from dask_cuda import LocalCUDACluster -else: - LocalCUDACluster = gpu_only_import_from("dask_cuda", "LocalCUDACluster") - +LocalCUDACluster = gpu_only_import_from("dask_cuda", "LocalCUDACluster") get_device_total_memory = gpu_only_import_from( "dask_cuda.utils", "get_device_total_memory" ) @@ -76,7 +70,7 @@ def start_dask_gpu_local_cluster( rmm_pool_size=rmm_pool_size, protocol=protocol, rmm_async=True, - enable_cudf_spill=True, + enable_cudf_spill=enable_spilling, **extra_kwargs, ) client = Client(cluster) From 8542f463aa56cf37f4b1deaccedd7f16226a6d50 Mon Sep 17 00:00:00 2001 From: Praateek Mahajan Date: Fri, 27 Sep 2024 14:42:13 -0700 Subject: [PATCH 3/6] remove unused imports Signed-off-by: Praateek Mahajan --- nemo_curator/utils/distributed_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index 2d1857a3..8556aa88 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -21,7 +21,7 @@ import warnings from contextlib import nullcontext from pathlib import Path -from typing import TYPE_CHECKING, Dict, List, Union +from typing import Dict, List, Union import dask.dataframe as dd import numpy as np @@ -29,7 +29,7 @@ import psutil from dask.distributed import Client, LocalCluster, get_worker, performance_report -from nemo_curator.utils.gpu_utils import GPU_INSTALL_STRING, is_cudf_type +from nemo_curator.utils.gpu_utils import is_cudf_type from nemo_curator.utils.import_utils import gpu_only_import, gpu_only_import_from cudf = gpu_only_import("cudf") From 0f6812160221b0370148340a6f371033d4fed139 Mon Sep 17 00:00:00 2001 From: Praateek Mahajan Date: Fri, 27 Sep 2024 14:58:30 -0700 Subject: [PATCH 4/6] remove _enable_spilling Signed-off-by: Praateek Mahajan --- nemo_curator/utils/distributed_utils.py | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index 8556aa88..d691a2a7 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -38,6 +38,8 @@ "dask_cuda.utils", "get_device_total_memory" ) +LocalCUDACluster() + class NoWorkerError(Exception): pass @@ -65,6 +67,7 @@ def start_dask_gpu_local_cluster( if nvlink_only and protocol == "ucx" else {} ) + LocalCUDACluster() cluster = LocalCUDACluster( rmm_pool_size=rmm_pool_size, @@ -190,18 +193,6 @@ def _set_torch_to_use_rmm(): torch.cuda.memory.change_current_allocator(rmm_torch_allocator) -def _enable_spilling(): - """ - Setting this environment variable enables automatic spilling (and "unspilling") - of buffers from device to host to enable out-of-memory computation, - i.e., computing on objects that occupy more memory than is available on the GPU. - - """ - import cudf - - cudf.set_option("spill", True) - - def read_single_partition( files, backend="cudf", From 16bf55e2966ea98bc932939fb277cd27a98f75c6 Mon Sep 17 00:00:00 2001 From: Praateek Mahajan Date: Fri, 27 Sep 2024 14:59:47 -0700 Subject: [PATCH 5/6] remove unused LCU obj Signed-off-by: Praateek Mahajan --- nemo_curator/utils/distributed_utils.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index d691a2a7..194acce9 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -38,8 +38,6 @@ "dask_cuda.utils", "get_device_total_memory" ) -LocalCUDACluster() - class NoWorkerError(Exception): pass From 7c476acc205a8278875dbb549f721d6338e60b08 Mon Sep 17 00:00:00 2001 From: Praateek Mahajan Date: Fri, 27 Sep 2024 15:00:15 -0700 Subject: [PATCH 6/6] remove unused LCC obj Signed-off-by: Praateek Mahajan --- nemo_curator/utils/distributed_utils.py | 1 - 1 file changed, 1 deletion(-) diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index 194acce9..a847d13e 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -65,7 +65,6 @@ def start_dask_gpu_local_cluster( if nvlink_only and protocol == "ucx" else {} ) - LocalCUDACluster() cluster = LocalCUDACluster( rmm_pool_size=rmm_pool_size,