Skip to content

Commit

Permalink
✨ check workers in slurm.
Browse files Browse the repository at this point in the history
  • Loading branch information
perillaroc committed Aug 3, 2023
1 parent 6d5a247 commit 133e858
Showing 1 changed file with 27 additions and 6 deletions.
33 changes: 27 additions & 6 deletions reki_data_tool/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,22 @@ def wrapper(*args, **kwargs):
return wrapper


def create_dask_client(engine: str = "local", client_kwargs: Dict = None) -> Client:
def create_dask_client(
engine: str = "local",
client_kwargs: Dict = None
) -> Client:
"""
Create dask client with scheduler and workers.
Parameters
----------
engine
local or mpi (require dask_mpi)
client_kwargs
how to create dask workers:
* local
* mpi (require dask_mpi)
client_kwargs
used as args when create ``dask.distributed``
Returns
-------
Client
Expand All @@ -118,14 +124,29 @@ def create_dask_client(engine: str = "local", client_kwargs: Dict = None) -> Cli
elif engine == "mpi":
from dask_mpi import initialize
initialize(
nanny=False,
protocol='tcp',
# nanny=False,
# protocol='tcp',
interface="ib0",
dashboard=False,
nthreads=1,
local_directory=os.getcwd()
# local_directory=os.getcwd()
local_directory=os.getenv('TMPDIR'), # use ${TMPDIR} in HPC2023
# exit=False,
)
client = Client(**client_kwargs)

# use in SLURM.
# We expect SLURM_NTASKS workers
N = int(os.getenv('SLURM_NTASKS')) - 2

logger.info("wait for workers and report...")
client.wait_for_workers(n_workers=N)
logger.info("wait for workers and report...done")

num_workers = len(client.scheduler_info()['workers'])
logger.info("%d workers available and ready" % num_workers)
# print(client.scheduler_info())

else:
raise ValueError(f"engine is not supported: {engine}")

Expand Down

0 comments on commit 133e858

Please sign in to comment.