Skip to content

Commit

Permalink
Slurm train scripts (#215)
Browse files Browse the repository at this point in the history
* finished slurm training script

* add illustration on how to use

* untrack auto generated files at installation

---------

Co-authored-by: zhulei <[email protected]>
  • Loading branch information
rayleizhu and rayleizhu authored Feb 24, 2023
1 parent 629f9cc commit 953acc1
Show file tree
Hide file tree
Showing 7 changed files with 372 additions and 1 deletion.
8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@


# auto generated file at installation
detrex/config/configs
detrex/version.py
# experiment dirs
outputs/

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
Expand Down
15 changes: 15 additions & 0 deletions configs/hydra/slurm/research.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@

partition: research # Partition where to submit
ngpus: ${num_gpus} # Number of gpus to request on each node
nodes: ${num_machines} # Number of nodes to request
cpus_per_task: 5 # Number of cpus per task/gpu
timeout: 240 # Duration of the job, in hours
job_name: "detrex" # job_name to display with `squeue`
job_dir: ~ # Job directory; leave empty for default (hydra.run.dir)
exclude_node: ~ # The node(s) to be excluded for slurm assignment, e.g. SH-IDC1-10-198-3-[10,20]
comment: ~ # Comment to pass to scheduler, e.g. priority message
quotatype: ~ # Some clusters may set different quotatype with different priority, e.g. reserved/spot

ddp_comm_mode: "tcp" # ddp communication mode, "file" or "tcp"
share_root: /path/that/can/be/accessed/by/all/machines # for "file" mode only
master_port: ~ # for "tcp" mode only, leave empty to find available port automatically
35 changes: 35 additions & 0 deletions configs/hydra/train_args.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
######### converted from default argparse args ###########
# config_file: ''
config_file: ${pycfg_dir}/${pycfg_file}
resume: false
eval_only: false
num_gpus: 1
num_machines: 1
machine_rank: 0
dist_url: tcp://127.0.0.1:24999
opts: []
############################################################

# aux params for easier management of overrides
pycfg_dir: projects/detr/configs
pycfg_file: detr_r50_300ep.py

# use automatic experiment name / output dir
auto_output_dir: True

hydra:
run:
# https://hydra.cc/docs/configure_hydra/workdir/
dir: "outputs/${hydra.job.override_dirname}/${now:%Y%m%d-%H:%M:%S}"
job:
config:
override_dirname:
kv_sep: '.'
item_sep: '-'
exclude_keys:
- config_file
- pycfg_dir
- slurm
- slurm.quotatype
- dist_url
- auto_output_dir
73 changes: 73 additions & 0 deletions detrex/utils/dist.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@

import torch.distributed as dist

import os
import torch
import builtins
import datetime
import subprocess

from detectron2.utils import comm


def get_rank() -> int:
if not dist.is_available():
Expand Down Expand Up @@ -50,3 +58,68 @@ def get_world_size() -> int:
if not is_dist_avail_and_initialized():
return 1
return dist.get_world_size()


def setup_for_distributed(is_master):
"""
This function disables printing when not in master process
"""
builtin_print = builtins.print

def print(*args, **kwargs):
force = kwargs.pop('force', False)
force = force or (get_world_size() > 8)
if is_master or force:
now = datetime.datetime.now().time()
builtin_print('[{}] '.format(now), end='') # print with time stamp
builtin_print(*args, **kwargs)

builtins.print = print


def slurm_init_distributed_mode(args):

assert 'SLURM_PROCID' in os.environ
assert hasattr(args, 'slurm')

######################################
# NOTE: using file://xxxx as dis_url is not stable
# https://shomy.top/2022/01/05/torch-ddp-intro/
if args.slurm.ddp_comm_mode == 'tcp':
node_list = os.environ['SLURM_NODELIST']
master_addr = subprocess.getoutput(f'scontrol show hostname {node_list} | head -n1')

# explicit tcp url
args.dist_url = "tcp://%s:%s"%(master_addr, args.slurm.master_port)

# alternatively, use env vars as below
# os.environ['MASTER_ADDR'] = master_addr
# os.environ['MASTER_PORT'] = f'{args.slurm.master_port}'
# os.environ['RANK'] = str(args.rank)
# os.environ['LOCAL_RANK'] = str(args.rank % torch.cuda.device_count())
# os.environ['WORLD_SIZE'] = str(args.world_size)
# args.dist_url = "env://"
######################################

args.rank = int(os.environ['SLURM_PROCID'])
args.gpu = args.rank % torch.cuda.device_count()

torch.cuda.set_device(args.gpu)
print('| distributed init (rank {}): {}, gpu {}'.format(
args.rank, args.dist_url, args.gpu), flush=True)
dist.init_process_group(backend='nccl', init_method=args.dist_url,
world_size=args.world_size, rank=args.rank)

assert comm._LOCAL_PROCESS_GROUP is None
n_gpus_per_machine = args.slurm.ngpus
num_machines = args.world_size // n_gpus_per_machine
machine_rank = args.rank // n_gpus_per_machine
for i in range(num_machines):
ranks_on_i = list(range(i * n_gpus_per_machine, (i + 1) * n_gpus_per_machine))
pg = dist.new_group(ranks_on_i)
if i == machine_rank:
comm._LOCAL_PROCESS_GROUP = pg
comm.synchronize()

# torch.distributed.barrier()
setup_for_distributed(args.rank == 0)
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ pytest
scipy==1.7.3
psutil
opencv-python
wandb
wandb
submitit
Empty file added tools/__init__.py
Empty file.
239 changes: 239 additions & 0 deletions tools/hydra_train_net.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
#!/usr/bin/env python
"""
A script to launch training, it surpports:
* one-line command to launch training locally or on a slurm cluster
* automatic experiment name generation according to hyperparameter overrides
* automatic requeueing & resume from latest checkpoint when a job reaches maximum running time or is preempted
Example usage:
STER 1: modify slurm config
```
$ cp configs/hydra/slurm/research.yaml configs/hydra/slurm/${CLUSTER_ID}.yaml && \
vim configs/hydra/slurm/${CLUSTER_ID}.yaml
```
STEP 2: launch training
```
$ python tools/hydra_train_net.py \
num_machines=2 num_gpus=8 auto_output_dir=true \
config_file=projects/detr/configs/detr_r50_300ep.py \
+model.num_queries=50 \
+slurm=${CLUSTER_ID}
```
STEP 3 (optional): check output dir
```
$ tree -L 2 ./outputs/
./outputs/
└── +model.num_queries.50-num_gpus.8-num_machines.2
└── 20230224-09:06:28
```
Contact ZHU Lei ([email protected]) for inquries about this script
"""

import sys
import os
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir)))
# print(sys.path)

# FIXME: it seems that, even though I put tools/.. in front of PYTHONPATH, the interpreter still finally
# find detectron2/tools/train_net. Two workarounds:
# 1. pip uninstall detrex && pip uninstall detectron2 && pip install detrex && pip install detectron2 (tested)
# 2. PYTHONPATH=${PWD}:${PYTHONPATH} python tools/hydra_train_net.py ... (not tested)
from tools.train_net import main

import hydra
from hydra.utils import get_original_cwd
from omegaconf import OmegaConf, DictConfig

from detectron2.engine import launch
from detectron2.config import LazyConfig
import os.path as osp

import submitit
import uuid
from pathlib import Path

from detrex.utils.dist import slurm_init_distributed_mode


def _find_free_port():
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Binding to port 0 will cause the OS to find an available port for us
sock.bind(("", 0))
port = sock.getsockname()[1]
sock.close()
# NOTE: there is still a chance the port could be taken by other processes.
return port

def get_shared_folder(share_root) -> Path:
if Path(share_root).parent.is_dir():
p = Path(f"{share_root}")
p.mkdir(exist_ok=True)
return p
raise RuntimeError(f"The parent of share_root ({share_root}) must exist!")

def get_init_file(share_root):
# Init file must not exist, but it's parent dir must exist.
os.makedirs(str(get_shared_folder(share_root)), exist_ok=True)
init_file = get_shared_folder(share_root) / f"{uuid.uuid4().hex}_init"
if init_file.exists():
os.remove(str(init_file))
return init_file

def get_dist_url(ddp_comm_mode='tcp', share_root=None):
if ddp_comm_mode == 'file':
assert share_root is not None
return get_init_file(share_root).as_uri()
elif ddp_comm_mode == 'tcp':
return 'env://'
else:
raise ValueError('Unknown DDP communication mode')

class Trainer(object):
def __init__(self, args):
self.args = args

def __call__(self):
self._setup_gpu_args()
if self.args.world_size > 1:
slurm_init_distributed_mode(self.args)
if not self.args.eval_only: # always auto resume if in training
self.args.resume = True
main(self.args)

def checkpoint(self): # being called when met timeout or preemption signal is received
import os
import submitit

self.args.dist_url = get_dist_url(
ddp_comm_mode=self.args.slurm.ddp_comm_mode,
share_root=self.args.slurm.share_root)

self.args.resume = True
print("Requeuing ", self.args)
empty_trainer = type(self)(self.args)
return submitit.helpers.DelayedSubmission(empty_trainer)

def _setup_gpu_args(self):
import submitit

job_env = submitit.JobEnvironment()
# https://shomy.top/2022/01/05/torch-ddp-intro/
# self.args.dist_url = f'tcp://{job_env.hostname}:{self.args.slurm.port}'
# self.args.output_dir = self.args.slurm.job_dir
self.args.gpu = job_env.local_rank
self.args.rank = job_env.global_rank
self.args.world_size = job_env.num_tasks
self.args.machine_rank = job_env.node

self.args.slurm.jobid = job_env.job_id # just in case of need, e.g. logging to wandb

print(f"Process group: {job_env.num_tasks} tasks, rank: {job_env.global_rank}")


# @hydra.main(version_base=None, config_path="../configs/hydra", config_name="train_args.yaml")
@hydra.main(config_path="../configs/hydra", config_name="train_args.yaml")
def hydra_app(args:DictConfig):
# NOTE: enable write to unknow field of cfg
# hence it behaves like argparse.NameSpace
# this is required as some args are determined at runtime
# https://stackoverflow.com/a/66296809
OmegaConf.set_struct(args, False)

# TODO: switch to hydra 1.3+, which natrually supports relative path
# the following workaround is for hydra 1.1.2
hydra_cfg = hydra.core.hydra_config.HydraConfig.get()
# since hydra 1.1.2 will change PWD to run dir, get current work dir first
args.config_file = osp.join(get_original_cwd(), args.config_file)

# command line args starting with '+' are for overrides, except '+slurm=[cluster_id]'
args.opts = [ x.replace('+', '') for x in hydra_cfg['overrides']['task'] if (x.startswith('+')
and not x.startswith('+slurm'))]
# print(args.opts)

hydra_run_dir = os.path.join(get_original_cwd(), hydra_cfg['run']['dir'])
if args.auto_output_dir:
args.opts.append(f"train.output_dir={hydra_run_dir}")
# print(args.opts)

# test args
# print(OmegaConf.to_yaml(args, resolve=True))

if not hasattr(args, 'slurm'): # run locally
launch(
main,
args.num_gpus,
num_machines=args.num_machines,
machine_rank=args.machine_rank,
dist_url=args.dist_url,
args=(args,),
)
else: # run with slurm
if args.slurm.job_dir is None: # use hydra run_dir as slurm output dir
hydra_cfg = hydra.core.hydra_config.HydraConfig.get()
args.slurm.job_dir = hydra_run_dir

if args.slurm.master_port is None: # automatically find free port for ddp communication
args.slurm.master_port = _find_free_port()

executor = submitit.AutoExecutor(folder=args.slurm.job_dir, slurm_max_num_timeout=30)

############## NOTE: this part is highly dependent on slurm version ##############
kwargs = {}
if args.slurm.comment:
kwargs['slurm_comment'] = args.slurm.comment

# NOTE: slurm of different versions may have different flags
# slurm_additional_parameters is flexible to cope with this scenario
slurm_additional_parameters={'ntasks': args.slurm.nodes*args.slurm.ngpus,
'gres': f'gpu:{args.slurm.ngpus}',
'ntasks-per-node': args.slurm.ngpus} # one task per GPU
if args.slurm.exclude_node:
slurm_additional_parameters['exclude'] = args.slurm.exclude_node

if args.slurm.quotatype:
slurm_additional_parameters['quotatype'] = args.slurm.quotatype
##################################################################################

executor.update_parameters(
## original
# mem_gb=40 * num_gpus_per_node,
# gpus_per_node=num_gpus_per_node,
# tasks_per_node=num_gpus_per_node, # one task per GPU
# nodes=nodes,
# timeout_min=timeout_min, # max is 60 * 72
## https://github.com/facebookincubator/submitit/issues/1639
# mem_per_cpu=4000,
# gpus_per_node=num_gpus_per_node,
# cpus_per_task=4,
cpus_per_task=args.slurm.cpus_per_task,
nodes=args.slurm.nodes,
slurm_additional_parameters=slurm_additional_parameters,
timeout_min=args.slurm.timeout * 60, # in minutes
# Below are cluster dependent parameters
slurm_partition=args.slurm.partition,
slurm_signal_delay_s=120,
**kwargs
)

executor.update_parameters(name=args.slurm.job_name)

args.dist_url = get_dist_url(
ddp_comm_mode=args.slurm.ddp_comm_mode,
share_root=args.slurm.share_root)

trainer = Trainer(args)
job = executor.submit(trainer)

print("Submitted job_id:", job.job_id)


if __name__ == '__main__':
hydra_app()

0 comments on commit 953acc1

Please sign in to comment.