Skip to content

Commit

Permalink
Merge pull request #775 from LBL-EESA/perlmutter_scheduling
Browse files Browse the repository at this point in the history
Perlmutter scheduling
  • Loading branch information
burlen authored Aug 31, 2023
2 parents 07a119a + fa1c209 commit 90500dc
Show file tree
Hide file tree
Showing 18 changed files with 344 additions and 145 deletions.
2 changes: 1 addition & 1 deletion alg/teca_bayesian_ar_detect.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ void teca_bayesian_ar_detect::set_modified()
void teca_bayesian_ar_detect::set_thread_pool_size(int n)
{
this->internals->queue = new_teca_data_request_queue(
this->get_communicator(), n, -1, true, this->get_verbose());
this->get_communicator(), n, -1, -1, true, this->get_verbose());
}

// --------------------------------------------------------------------------
Expand Down
2 changes: 0 additions & 2 deletions alg/teca_descriptive_statistics.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,6 @@ const_p_teca_dataset teca_descriptive_statistics::execute(
atrs.set("step", (teca_metadata)step_atts);

table->get_metadata().set("attributes", atrs);

atrs.to_stream(std::cerr);
}

// for each variable
Expand Down
2 changes: 1 addition & 1 deletion alg/teca_pytorch_algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def initialize(self):
n_threads = self.n_threads

n_threads, affinity, device_ids = \
thread_util.thread_parameters(comm, n_threads, 1, -1,
thread_util.thread_parameters(comm, n_threads, 1, -1, -1,
0 if self.verbose < 2 else 1)

# let the user request a bound on the number of threads
Expand Down
9 changes: 6 additions & 3 deletions core/teca_cpu_thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ void teca_cpu_thread_pool<task_t, data_t>::create_threads(MPI_Comm comm,
std::vector<int> device_ids;

if (teca_thread_util::thread_parameters(comm, -1,
n_requested, bind, -1, verbose, n_threads, core_ids,
n_requested, -1, -1, bind, verbose, n_threads, core_ids,
device_ids))
{
TECA_WARNING("Failed to detetermine thread parameters."
Expand Down Expand Up @@ -229,9 +229,12 @@ int teca_cpu_thread_pool<task_t, data_t>::wait_some(long n_to_wait,
}
}

// if we have not accumulated the requested number of datasets
// we have the requested number of datasets
if (data.size() >= static_cast<unsigned int>(n_to_wait))
break;

// wait for the user supplied duration before re-scanning
if (thread_valid && (data.size() < static_cast<unsigned int>(n_to_wait)))
if (thread_valid)
std::this_thread::sleep_for(std::chrono::nanoseconds(poll_interval));
}

Expand Down
40 changes: 25 additions & 15 deletions core/teca_cuda_thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,21 @@ class TECA_EXPORT teca_cuda_thread_pool
* ranks running on the same node are taken into
* account, resulting in 1 thread per core node wide.
*
* @param[in] n_threads_per_device number of threads to assign to each CUDA
* device. If 0 no threads will service
* CUDA devices, this is CPU only mode.
* If -1 the default of 8 threads per
* device will be used.
* @param[in] threads_per_device number of threads to assign to each
* GPU/device. If 0 only CPUs will be used.
* If -1 the default of 8 threads per device
* will be used.
*
* @param[in] ranks_per_device the number of MPI ranks to allow access to
* to each device/GPU.
*
* @param[in] bind bind each thread to a specific core.
*
* @param[in] verbose print a report of the thread to core bindings
*/
teca_cuda_thread_pool(MPI_Comm comm, int n_threads,
int n_threads_per_device, bool bind, bool verbose);
int threads_per_device, int ranks_per_device, bool bind,
bool verbose);

// get rid of copy and asignment
TECA_ALGORITHM_DELETE_COPY_ASSIGN(teca_cuda_thread_pool)
Expand Down Expand Up @@ -114,7 +117,8 @@ class TECA_EXPORT teca_cuda_thread_pool
private:
/// create n threads for the pool
void create_threads(MPI_Comm comm, int n_threads,
int n_threads_per_device, bool bind, bool verbose);
int threads_per_device, int ranks_per_device, bool bind,
bool verbose);

private:
long m_num_futures;
Expand All @@ -128,15 +132,18 @@ class TECA_EXPORT teca_cuda_thread_pool
// --------------------------------------------------------------------------
template <typename task_t, typename data_t>
teca_cuda_thread_pool<task_t, data_t>::teca_cuda_thread_pool(MPI_Comm comm,
int n_threads, int n_threads_per_device, bool bind, bool verbose) : m_live(true)
int n_threads, int threads_per_device, int ranks_per_device, bool bind,
bool verbose) : m_live(true)
{
this->create_threads(comm, n_threads, n_threads_per_device, bind, verbose);
this->create_threads(comm, n_threads, threads_per_device,
ranks_per_device, bind, verbose);
}

// --------------------------------------------------------------------------
template <typename task_t, typename data_t>
void teca_cuda_thread_pool<task_t, data_t>::create_threads(MPI_Comm comm,
int n_requested, int n_per_device, bool bind, bool verbose)
int n_requested, int threads_per_device, int ranks_per_device, bool bind,
bool verbose)
{
m_num_futures = 0;

Expand All @@ -146,8 +153,8 @@ void teca_cuda_thread_pool<task_t, data_t>::create_threads(MPI_Comm comm,
std::deque<int> core_ids;
std::vector<int> device_ids;

if (teca_thread_util::thread_parameters(comm, -1,
n_requested, n_per_device, bind, verbose, n_threads,
if (teca_thread_util::thread_parameters(comm, -1, n_requested,
threads_per_device, ranks_per_device, bind, verbose, n_threads,
core_ids, device_ids))
{
TECA_WARNING("Failed to detetermine thread parameters."
Expand Down Expand Up @@ -228,7 +235,7 @@ int teca_cuda_thread_pool<task_t, data_t>::wait_some(long n_to_wait,

// gather the requested number of datasets
size_t thread_valid = 1;
while (thread_valid)
while (thread_valid && ((data.size() < static_cast<unsigned int>(n_to_wait))))
{
{
thread_valid = 0;
Expand All @@ -252,9 +259,12 @@ int teca_cuda_thread_pool<task_t, data_t>::wait_some(long n_to_wait,
}
}

// if we have not accumulated the requested number of datasets
// we have the requested number of datasets
if (data.size() >= static_cast<unsigned int>(n_to_wait))
break;

// wait for the user supplied duration before re-scanning
if (thread_valid && (data.size() < static_cast<unsigned int>(n_to_wait)))
if (thread_valid)
std::this_thread::sleep_for(std::chrono::nanoseconds(poll_interval));
}

Expand Down
53 changes: 23 additions & 30 deletions core/teca_cuda_util.cu
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ int synchronize()
int get_local_cuda_devices(MPI_Comm comm, int &ranks_per_device,
std::vector<int> &local_dev)
{
// if ranks per device is zero this is a CPU only run
if (ranks_per_device == 0)
return 0;

cudaError_t ierr = cudaSuccess;

// get the number of CUDA GPU's available on this node
Expand Down Expand Up @@ -55,54 +59,43 @@ int get_local_cuda_devices(MPI_Comm comm, int &ranks_per_device,
MPI_Comm_size(node_comm, &n_node_ranks);
MPI_Comm_rank(node_comm, &node_rank);

// adjust the number of devices such that multiple ranks may share a
// device.
if (ranks_per_device < 0)
{
ranks_per_device = n_node_ranks / n_node_dev +
(n_node_ranks % n_node_dev ? 1 : 0);

// limit to at most 8 MPI ranks per GPU
ranks_per_device = std::min(ranks_per_device, 8);
}

int n_node_dev_use = n_node_dev * ranks_per_device;

if (n_node_dev_use >= n_node_ranks)
if (n_node_ranks < n_node_dev)
{
// more devices than ranks,
// assign devices evenly between ranks
int max_dev = n_node_dev_use - 1;
int n_per_rank = std::max(n_node_dev_use / n_node_ranks, 1);
int n_larger = n_node_dev_use % n_node_ranks;
int n_per_rank = n_node_dev / n_node_ranks;
int n_larger = n_node_dev % n_node_ranks;
int n_local = n_per_rank + (node_rank < n_larger ? 1 : 0);

int first_dev = n_per_rank * node_rank
+ (node_rank < n_larger ? node_rank : n_larger);

first_dev = std::min(max_dev, first_dev);

int last_dev = first_dev + n_per_rank - 1
+ (node_rank < n_larger ? 1 : 0);

last_dev = std::min(max_dev, last_dev);

for (int i = first_dev; i <= last_dev; ++i)
local_dev.push_back( i % n_node_dev );
for (int i = 0; i < n_local; ++i)
local_dev.push_back(first_dev + i);
}
else
{
// assign at most one MPI rank per GPU
if (node_rank < n_node_dev_use)
// TODO -- automatic settings
if (ranks_per_device < 0)
ranks_per_device *= -1;

// more ranks than devices. round robin assignment such that at
// most each device has ranks_per_device. the remaining ranks will
// be CPU only.
if (node_rank < ranks_per_device * n_node_dev)
{
local_dev.push_back( node_rank % n_node_dev );
}
}

MPI_Comm_free(&node_comm);

return 0;
}
else
#endif
// without MPI this process can use all CUDA devices
if (ranks_per_device != 0)
{
// without MPI this process can use all CUDA devices
for (int i = 0; i < n_node_dev; ++i)
local_dev.push_back(i);
}
Expand Down
15 changes: 7 additions & 8 deletions core/teca_cuda_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,17 @@ namespace teca_cuda_util
{
/** Query the system for the locally available(on this rank) CUDA device count.
* this is an MPI collective call which returns a set of device ids that can be
* used locally. If there are as many (or more than) devices on the node than
* the number of MPI ranks assigned to the node the list of device ids will be
* unique across MPI ranks on the node. Otherwise devices are assigned round
* robin fashion.
* used locally. Node wide coordination assures that one can put a limit on the
* number of ranks per node.
*
* @param[in] comm MPI communicator defining a set of nodes on which need
* access to the available GPUS
* @param[in,out] ranks_per_device The number of MPI ranks to use per CUDA
* device. Passing -1 will assign all MPI
* ranks a GPU up to a maximum of 8 ranks
* per GPU. The number of ranks per GPU
* used is returned through this argument.
* device. When set to 0 no GPUs are used. When
* set to -1 all ranks are assigned a GPU but
* multiple ranks will share a GPU when there
* are more ranks than devices.
*
* @param[out] local_dev a list of device ids that can be used by the calling
* MPI rank.
* @returns non-zero on error.
Expand Down
32 changes: 19 additions & 13 deletions core/teca_thread_util.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -339,11 +339,12 @@ int generate_report(MPI_Comm comm, int local_proc, int base_id,

// **************************************************************************
int thread_parameters(MPI_Comm comm, int base_core_id, int n_requested,
int n_per_device, bool bind, bool verbose, int &n_threads,
std::deque<int> &affinity, std::vector<int> &device_ids)
int threads_per_device, int ranks_per_device, bool bind, bool verbose,
int &n_threads, std::deque<int> &affinity, std::vector<int> &device_ids)
{
#if !defined(TECA_HAS_CUDA)
(void) n_per_device;
(void) threads_per_device;
(void) ranks_per_device;
#endif

// this rank is excluded from computations
Expand Down Expand Up @@ -379,7 +380,6 @@ int thread_parameters(MPI_Comm comm, int base_core_id, int n_requested,

#if defined(TECA_HAS_CUDA)
// check for an override to the default number of MPI ranks per device
int ranks_per_device = 1;
int ranks_per_device_set = teca_system_util::get_environment_variable
("TECA_RANKS_PER_DEVICE", ranks_per_device);

Expand All @@ -404,14 +404,14 @@ int thread_parameters(MPI_Comm comm, int base_core_id, int n_requested,
int default_per_device = 8;

if (!teca_system_util::get_environment_variable
("TECA_THREADS_PER_DEVICE", n_per_device) &&
("TECA_THREADS_PER_DEVICE", threads_per_device) &&
verbose && (rank == 0))
{
TECA_STATUS("TECA_THREADS_PER_DEVICE = " << n_per_device)
TECA_STATUS("TECA_THREADS_PER_DEVICE = " << threads_per_device)
}

int n_device_threads = n_cuda_devices *
(n_per_device < 0 ? default_per_device : n_per_device);
(threads_per_device < 0 ? default_per_device : threads_per_device);

#endif

Expand Down Expand Up @@ -528,16 +528,22 @@ int thread_parameters(MPI_Comm comm, int base_core_id, int n_requested,

// thread pool size is based on core and process count
int nlg = 0;
// map threads to physical cores
nlg = cores_per_node % n_procs;
n_threads = cores_per_node / n_procs + (proc_id < nlg ? 1 : 0);
if (n_requested > 0)
{
// user specified override
// use exactly this many
n_threads = n_requested;
}
else if (n_requested < -1)
{
// use at most this many
n_threads = std::min(-n_requested, n_threads);
n_requested = n_threads;
}
else
{
// map threads to physical cores
nlg = cores_per_node % n_procs;
n_threads = cores_per_node/n_procs + (proc_id < nlg ? 1 : 0);
}

// if the user runs more MPI ranks than cores some of the ranks
Expand All @@ -560,7 +566,7 @@ int thread_parameters(MPI_Comm comm, int base_core_id, int n_requested,

// assign each thread to a CUDA device or CPU core
#if defined(TECA_HAS_CUDA)
if (verbose && (n_threads < n_cuda_devices))
if (verbose && (n_ranks < 2) && (n_threads < n_cuda_devices))
{
TECA_WARNING(<< n_threads
<< " threads is insufficient to service " << n_cuda_devices
Expand Down Expand Up @@ -590,7 +596,7 @@ int thread_parameters(MPI_Comm comm, int base_core_id, int n_requested,
if (!bind)
{
if (verbose)
TECA_STATUS("thread to core binding disabled")
TECA_STATUS("thread to core binding disabled. n_threads=" << n_threads)

return 0;
}
Expand Down
Loading

0 comments on commit 90500dc

Please sign in to comment.