diff --git a/alg/teca_bayesian_ar_detect.cxx b/alg/teca_bayesian_ar_detect.cxx index a81bd2fee..30afad9a9 100644 --- a/alg/teca_bayesian_ar_detect.cxx +++ b/alg/teca_bayesian_ar_detect.cxx @@ -692,7 +692,7 @@ void teca_bayesian_ar_detect::set_modified() void teca_bayesian_ar_detect::set_thread_pool_size(int n) { this->internals->queue = new_teca_data_request_queue( - this->get_communicator(), n, -1, true, this->get_verbose()); + this->get_communicator(), n, -1, -1, true, this->get_verbose()); } // -------------------------------------------------------------------------- diff --git a/alg/teca_descriptive_statistics.cxx b/alg/teca_descriptive_statistics.cxx index a02b469c7..242c24782 100644 --- a/alg/teca_descriptive_statistics.cxx +++ b/alg/teca_descriptive_statistics.cxx @@ -306,8 +306,6 @@ const_p_teca_dataset teca_descriptive_statistics::execute( atrs.set("step", (teca_metadata)step_atts); table->get_metadata().set("attributes", atrs); - - atrs.to_stream(std::cerr); } // for each variable diff --git a/alg/teca_pytorch_algorithm.py b/alg/teca_pytorch_algorithm.py index c0f831b06..c8ae53b09 100644 --- a/alg/teca_pytorch_algorithm.py +++ b/alg/teca_pytorch_algorithm.py @@ -145,7 +145,7 @@ def initialize(self): n_threads = self.n_threads n_threads, affinity, device_ids = \ - thread_util.thread_parameters(comm, n_threads, 1, -1, + thread_util.thread_parameters(comm, n_threads, 1, -1, -1, 0 if self.verbose < 2 else 1) # let the user request a bound on the number of threads diff --git a/core/teca_cpu_thread_pool.h b/core/teca_cpu_thread_pool.h index 4041ffc3f..396b31778 100644 --- a/core/teca_cpu_thread_pool.h +++ b/core/teca_cpu_thread_pool.h @@ -123,7 +123,7 @@ void teca_cpu_thread_pool::create_threads(MPI_Comm comm, std::vector device_ids; if (teca_thread_util::thread_parameters(comm, -1, - n_requested, bind, -1, verbose, n_threads, core_ids, + n_requested, -1, -1, bind, verbose, n_threads, core_ids, device_ids)) { TECA_WARNING("Failed to detetermine thread parameters." @@ -229,9 +229,12 @@ int teca_cpu_thread_pool::wait_some(long n_to_wait, } } - // if we have not accumulated the requested number of datasets + // we have the requested number of datasets + if (data.size() >= static_cast(n_to_wait)) + break; + // wait for the user supplied duration before re-scanning - if (thread_valid && (data.size() < static_cast(n_to_wait))) + if (thread_valid) std::this_thread::sleep_for(std::chrono::nanoseconds(poll_interval)); } diff --git a/core/teca_cuda_thread_pool.h b/core/teca_cuda_thread_pool.h index 1b702b906..618454291 100644 --- a/core/teca_cuda_thread_pool.h +++ b/core/teca_cuda_thread_pool.h @@ -67,18 +67,21 @@ class TECA_EXPORT teca_cuda_thread_pool * ranks running on the same node are taken into * account, resulting in 1 thread per core node wide. * - * @param[in] n_threads_per_device number of threads to assign to each CUDA - * device. If 0 no threads will service - * CUDA devices, this is CPU only mode. - * If -1 the default of 8 threads per - * device will be used. + * @param[in] threads_per_device number of threads to assign to each + * GPU/device. If 0 only CPUs will be used. + * If -1 the default of 8 threads per device + * will be used. + * + * @param[in] ranks_per_device the number of MPI ranks to allow access to + * to each device/GPU. * * @param[in] bind bind each thread to a specific core. * * @param[in] verbose print a report of the thread to core bindings */ teca_cuda_thread_pool(MPI_Comm comm, int n_threads, - int n_threads_per_device, bool bind, bool verbose); + int threads_per_device, int ranks_per_device, bool bind, + bool verbose); // get rid of copy and asignment TECA_ALGORITHM_DELETE_COPY_ASSIGN(teca_cuda_thread_pool) @@ -114,7 +117,8 @@ class TECA_EXPORT teca_cuda_thread_pool private: /// create n threads for the pool void create_threads(MPI_Comm comm, int n_threads, - int n_threads_per_device, bool bind, bool verbose); + int threads_per_device, int ranks_per_device, bool bind, + bool verbose); private: long m_num_futures; @@ -128,15 +132,18 @@ class TECA_EXPORT teca_cuda_thread_pool // -------------------------------------------------------------------------- template teca_cuda_thread_pool::teca_cuda_thread_pool(MPI_Comm comm, - int n_threads, int n_threads_per_device, bool bind, bool verbose) : m_live(true) + int n_threads, int threads_per_device, int ranks_per_device, bool bind, + bool verbose) : m_live(true) { - this->create_threads(comm, n_threads, n_threads_per_device, bind, verbose); + this->create_threads(comm, n_threads, threads_per_device, + ranks_per_device, bind, verbose); } // -------------------------------------------------------------------------- template void teca_cuda_thread_pool::create_threads(MPI_Comm comm, - int n_requested, int n_per_device, bool bind, bool verbose) + int n_requested, int threads_per_device, int ranks_per_device, bool bind, + bool verbose) { m_num_futures = 0; @@ -146,8 +153,8 @@ void teca_cuda_thread_pool::create_threads(MPI_Comm comm, std::deque core_ids; std::vector device_ids; - if (teca_thread_util::thread_parameters(comm, -1, - n_requested, n_per_device, bind, verbose, n_threads, + if (teca_thread_util::thread_parameters(comm, -1, n_requested, + threads_per_device, ranks_per_device, bind, verbose, n_threads, core_ids, device_ids)) { TECA_WARNING("Failed to detetermine thread parameters." @@ -228,7 +235,7 @@ int teca_cuda_thread_pool::wait_some(long n_to_wait, // gather the requested number of datasets size_t thread_valid = 1; - while (thread_valid) + while (thread_valid && ((data.size() < static_cast(n_to_wait)))) { { thread_valid = 0; @@ -252,9 +259,12 @@ int teca_cuda_thread_pool::wait_some(long n_to_wait, } } - // if we have not accumulated the requested number of datasets + // we have the requested number of datasets + if (data.size() >= static_cast(n_to_wait)) + break; + // wait for the user supplied duration before re-scanning - if (thread_valid && (data.size() < static_cast(n_to_wait))) + if (thread_valid) std::this_thread::sleep_for(std::chrono::nanoseconds(poll_interval)); } diff --git a/core/teca_cuda_util.cu b/core/teca_cuda_util.cu index 434a1f0e2..300f36063 100644 --- a/core/teca_cuda_util.cu +++ b/core/teca_cuda_util.cu @@ -20,6 +20,10 @@ int synchronize() int get_local_cuda_devices(MPI_Comm comm, int &ranks_per_device, std::vector &local_dev) { + // if ranks per device is zero this is a CPU only run + if (ranks_per_device == 0) + return 0; + cudaError_t ierr = cudaSuccess; // get the number of CUDA GPU's available on this node @@ -55,54 +59,43 @@ int get_local_cuda_devices(MPI_Comm comm, int &ranks_per_device, MPI_Comm_size(node_comm, &n_node_ranks); MPI_Comm_rank(node_comm, &node_rank); - // adjust the number of devices such that multiple ranks may share a - // device. - if (ranks_per_device < 0) - { - ranks_per_device = n_node_ranks / n_node_dev + - (n_node_ranks % n_node_dev ? 1 : 0); - - // limit to at most 8 MPI ranks per GPU - ranks_per_device = std::min(ranks_per_device, 8); - } - - int n_node_dev_use = n_node_dev * ranks_per_device; - - if (n_node_dev_use >= n_node_ranks) + if (n_node_ranks < n_node_dev) { + // more devices than ranks, // assign devices evenly between ranks - int max_dev = n_node_dev_use - 1; - int n_per_rank = std::max(n_node_dev_use / n_node_ranks, 1); - int n_larger = n_node_dev_use % n_node_ranks; + int n_per_rank = n_node_dev / n_node_ranks; + int n_larger = n_node_dev % n_node_ranks; + int n_local = n_per_rank + (node_rank < n_larger ? 1 : 0); int first_dev = n_per_rank * node_rank + (node_rank < n_larger ? node_rank : n_larger); - first_dev = std::min(max_dev, first_dev); - - int last_dev = first_dev + n_per_rank - 1 - + (node_rank < n_larger ? 1 : 0); - - last_dev = std::min(max_dev, last_dev); - - for (int i = first_dev; i <= last_dev; ++i) - local_dev.push_back( i % n_node_dev ); + for (int i = 0; i < n_local; ++i) + local_dev.push_back(first_dev + i); } else { - // assign at most one MPI rank per GPU - if (node_rank < n_node_dev_use) + // TODO -- automatic settings + if (ranks_per_device < 0) + ranks_per_device *= -1; + + // more ranks than devices. round robin assignment such that at + // most each device has ranks_per_device. the remaining ranks will + // be CPU only. + if (node_rank < ranks_per_device * n_node_dev) + { local_dev.push_back( node_rank % n_node_dev ); + } } MPI_Comm_free(&node_comm); - return 0; } + else #endif - // without MPI this process can use all CUDA devices if (ranks_per_device != 0) { + // without MPI this process can use all CUDA devices for (int i = 0; i < n_node_dev; ++i) local_dev.push_back(i); } diff --git a/core/teca_cuda_util.h b/core/teca_cuda_util.h index e77152ff1..1d13a6b12 100644 --- a/core/teca_cuda_util.h +++ b/core/teca_cuda_util.h @@ -19,18 +19,17 @@ namespace teca_cuda_util { /** Query the system for the locally available(on this rank) CUDA device count. * this is an MPI collective call which returns a set of device ids that can be - * used locally. If there are as many (or more than) devices on the node than - * the number of MPI ranks assigned to the node the list of device ids will be - * unique across MPI ranks on the node. Otherwise devices are assigned round - * robin fashion. + * used locally. Node wide coordination assures that one can put a limit on the + * number of ranks per node. * * @param[in] comm MPI communicator defining a set of nodes on which need * access to the available GPUS * @param[in,out] ranks_per_device The number of MPI ranks to use per CUDA - * device. Passing -1 will assign all MPI - * ranks a GPU up to a maximum of 8 ranks - * per GPU. The number of ranks per GPU - * used is returned through this argument. + * device. When set to 0 no GPUs are used. When + * set to -1 all ranks are assigned a GPU but + * multiple ranks will share a GPU when there + * are more ranks than devices. + * * @param[out] local_dev a list of device ids that can be used by the calling * MPI rank. * @returns non-zero on error. diff --git a/core/teca_thread_util.cxx b/core/teca_thread_util.cxx index 1ac210a11..39975658d 100644 --- a/core/teca_thread_util.cxx +++ b/core/teca_thread_util.cxx @@ -339,11 +339,12 @@ int generate_report(MPI_Comm comm, int local_proc, int base_id, // ************************************************************************** int thread_parameters(MPI_Comm comm, int base_core_id, int n_requested, - int n_per_device, bool bind, bool verbose, int &n_threads, - std::deque &affinity, std::vector &device_ids) + int threads_per_device, int ranks_per_device, bool bind, bool verbose, + int &n_threads, std::deque &affinity, std::vector &device_ids) { #if !defined(TECA_HAS_CUDA) - (void) n_per_device; + (void) threads_per_device; + (void) ranks_per_device; #endif // this rank is excluded from computations @@ -379,7 +380,6 @@ int thread_parameters(MPI_Comm comm, int base_core_id, int n_requested, #if defined(TECA_HAS_CUDA) // check for an override to the default number of MPI ranks per device - int ranks_per_device = 1; int ranks_per_device_set = teca_system_util::get_environment_variable ("TECA_RANKS_PER_DEVICE", ranks_per_device); @@ -404,14 +404,14 @@ int thread_parameters(MPI_Comm comm, int base_core_id, int n_requested, int default_per_device = 8; if (!teca_system_util::get_environment_variable - ("TECA_THREADS_PER_DEVICE", n_per_device) && + ("TECA_THREADS_PER_DEVICE", threads_per_device) && verbose && (rank == 0)) { - TECA_STATUS("TECA_THREADS_PER_DEVICE = " << n_per_device) + TECA_STATUS("TECA_THREADS_PER_DEVICE = " << threads_per_device) } int n_device_threads = n_cuda_devices * - (n_per_device < 0 ? default_per_device : n_per_device); + (threads_per_device < 0 ? default_per_device : threads_per_device); #endif @@ -528,16 +528,22 @@ int thread_parameters(MPI_Comm comm, int base_core_id, int n_requested, // thread pool size is based on core and process count int nlg = 0; + // map threads to physical cores + nlg = cores_per_node % n_procs; + n_threads = cores_per_node / n_procs + (proc_id < nlg ? 1 : 0); if (n_requested > 0) { - // user specified override + // use exactly this many n_threads = n_requested; } + else if (n_requested < -1) + { + // use at most this many + n_threads = std::min(-n_requested, n_threads); + n_requested = n_threads; + } else { - // map threads to physical cores - nlg = cores_per_node % n_procs; - n_threads = cores_per_node/n_procs + (proc_id < nlg ? 1 : 0); } // if the user runs more MPI ranks than cores some of the ranks @@ -560,7 +566,7 @@ int thread_parameters(MPI_Comm comm, int base_core_id, int n_requested, // assign each thread to a CUDA device or CPU core #if defined(TECA_HAS_CUDA) - if (verbose && (n_threads < n_cuda_devices)) + if (verbose && (n_ranks < 2) && (n_threads < n_cuda_devices)) { TECA_WARNING(<< n_threads << " threads is insufficient to service " << n_cuda_devices @@ -590,7 +596,7 @@ int thread_parameters(MPI_Comm comm, int base_core_id, int n_requested, if (!bind) { if (verbose) - TECA_STATUS("thread to core binding disabled") + TECA_STATUS("thread to core binding disabled. n_threads=" << n_threads) return 0; } diff --git a/core/teca_thread_util.h b/core/teca_thread_util.h index 52e4a9d79..c420a8431 100644 --- a/core/teca_thread_util.h +++ b/core/teca_thread_util.h @@ -37,17 +37,22 @@ namespace teca_thread_util * be bound to to acheive this. Passing n_requested >= 1 * specifies a run time override. This indicates that * caller wants to use a specific number of threads, - * rather than one per physical core. In this case the - * affinity map is also constructed. + * rather than one per physical core. Passing + * n_requested < -1 specifies a maximum to use if + * sufficient cores are available. In all cases the + * affinity map is constructed. * * @param[in] n_threads_per_device the number of threads that should service - * GPUs. If 0 no threads will service GPUs the - * run will be CPU only. If -1 the default - * setting (8 threads per GPU) will be used. - * This can be overriden at runtime with the - * TECA_THREADS_PER_DEVICE environment + * GPUs. If 0 the run will be CPU only. If -1 + * the default setting (8 threads per GPU) will + * be used. This can be overriden at runtime + * with the TECA_THREADS_PER_DEVICE environment * variable. * + * @param[in] n_ranks_per_device the number of MPI ranks that should be allowed + * to access each GPU. MPI ranks not allowed to + * access a GPU will execute on the CPU. + * * @param[in] bind if true extra work is done to determine an affinity map such * that each thread can be bound to a unique core on the node. * @@ -57,10 +62,11 @@ namespace teca_thread_util * of threads one can use such that there is one * thread per phycial core taking into account all * ranks running on the node. if n_requested is >= 1 - * n_threads will be set to n_requested. This allows a - * run time override for cases when the caller knows - * how she wants to schedule things. if an error - * occurs and n_requested is -1 this will be set to 1. + * n_threads will explicitly be set to n_requested. If + * n_requested < -1 at most -n_requested threads will + * be used. Fewer threads will be used if there are + * insufficient cores available. if an error occurs + * and n_requested is -1 this will be set to 1. * * @param[out] affinity an affinity map, describing for each of n_threads, * a core id that the thread can be bound to. if @@ -83,8 +89,8 @@ namespace teca_thread_util */ TECA_EXPORT int thread_parameters(MPI_Comm comm, int base_core_id, int n_requested, - int n_threads_per_device, bool bind, bool verbose, int &n_threads, - std::deque &affinity, std::vector &device_ids); + int n_threads_per_device, int n_ranks_per_device, bool bind, bool verbose, + int &n_threads, std::deque &affinity, std::vector &device_ids); }; #endif diff --git a/core/teca_threaded_algorithm.cxx b/core/teca_threaded_algorithm.cxx index b985afd32..743468ecc 100644 --- a/core/teca_threaded_algorithm.cxx +++ b/core/teca_threaded_algorithm.cxx @@ -23,13 +23,16 @@ // ************************************************************************** p_teca_data_request_queue new_teca_data_request_queue(MPI_Comm comm, - int n_threads, int n_threads_per_device, bool bind, bool verbose) + int n_threads, int threads_per_device, int ranks_per_device, bool bind, + bool verbose) { #if defined(TECA_HAS_CUDA) - return std::make_shared( - comm, n_threads, n_threads_per_device, bind, verbose); + return std::make_shared(comm, + n_threads, threads_per_device, ranks_per_device, bind, + verbose); #else - (void) n_threads_per_device; + (void) threads_per_device; + (void) ranks_per_device; return std::make_shared( comm, n_threads, bind, verbose); #endif @@ -42,12 +45,16 @@ class teca_data_request public: teca_data_request(const p_teca_algorithm &alg, const teca_algorithm_output_port up_port, - const teca_metadata &up_req) : m_alg(alg), - m_up_port(up_port), m_up_req(up_req) + const teca_metadata &up_req, int pass_dev) : m_alg(alg), + m_up_port(up_port), m_up_req(up_req), m_pass_dev(pass_dev) {} const_p_teca_dataset operator()(int device_id = -1) { + // use the device assigned downstream + if (m_pass_dev) + m_up_req.get("device_id", device_id); + // set the device on which to execute this pipeline m_up_req.set("device_id", device_id); @@ -59,6 +66,7 @@ class teca_data_request p_teca_algorithm m_alg; teca_algorithm_output_port m_up_port; teca_metadata m_up_req; + int m_pass_dev; }; @@ -69,7 +77,8 @@ class teca_threaded_algorithm_internals teca_threaded_algorithm_internals() {} void thread_pool_resize(MPI_Comm comm, int n_threads, - int n_threads_per_device, bool bind, bool verbose); + int threads_per_device, int ranks_per_device, bool bind, + bool verbose); unsigned int get_thread_pool_size() const noexcept { return this->thread_pool ? this->thread_pool->size() : 0; } @@ -80,10 +89,11 @@ class teca_threaded_algorithm_internals // -------------------------------------------------------------------------- void teca_threaded_algorithm_internals::thread_pool_resize(MPI_Comm comm, - int n_threads, int n_threads_per_device, bool bind, bool verbose) + int n_threads, int threads_per_device, int ranks_per_device, bool bind, + bool verbose) { - this->thread_pool = new_teca_data_request_queue( - comm, n_threads, n_threads_per_device, bind, verbose); + this->thread_pool = new_teca_data_request_queue(comm, + n_threads, threads_per_device, ranks_per_device, bind, verbose); } @@ -91,7 +101,7 @@ void teca_threaded_algorithm_internals::thread_pool_resize(MPI_Comm comm, // -------------------------------------------------------------------------- teca_threaded_algorithm::teca_threaded_algorithm() : bind_threads(1), stream_size(-1), poll_interval(1000000), - threads_per_device(-1), + threads_per_device(-1), ranks_per_device(1), propagate_device_assignment(0), internals(new teca_threaded_algorithm_internals) { } @@ -113,21 +123,25 @@ void teca_threaded_algorithm::get_properties_description( opts.add_options() TECA_POPTS_GET(int, prefix, bind_threads, "bind software threads to hardware cores") - TECA_POPTS_GET(int, prefix, verbose, - "print a run time report of settings") TECA_POPTS_GET(int, prefix, thread_pool_size, "number of threads in pool. When n == -1, 1 thread per core is " "created") TECA_POPTS_GET(int, prefix, stream_size, "number of datasests to pass per execute call. -1 means wait " "for all.") - TECA_POPTS_GET(long, prefix, poll_interval, + TECA_POPTS_GET(long long, prefix, poll_interval, "number of nanoseconds to wait between scans of the thread pool " "for completed tasks") TECA_POPTS_GET(int, prefix, threads_per_device, - "Sets the number of threads that service each CUDA GPU. If -1 the" - "default of 8 threads per CUDA GPU is used. If 0 no threads will" - "service GPUs (CPU only mode)") + "Sets the number of threads that service each CUDA GPU. If -1 the " + "default of 8 threads per CUDA GPU is used. If 0 only the CPU is used.") + TECA_POPTS_GET(int, prefix, ranks_per_device, + "Sets the number of threads that service each CUDA GPU. If -1 the " + "default of ranks allowed to access each GPU.") + TECA_POPTS_GET(int, prefix, propagate_device_assignment, + "When set device assignment is taken from the in coming request. " + "Otherwise the thread executing the upstream pipeline provides the " + "device assignment.") ; this->teca_algorithm::get_properties_description(prefix, opts); @@ -142,8 +156,13 @@ void teca_threaded_algorithm::set_properties(const std::string &prefix, this->teca_algorithm::set_properties(prefix, opts); TECA_POPTS_SET(opts, int, prefix, bind_threads) - TECA_POPTS_SET(opts, int, prefix, verbose) + TECA_POPTS_SET(opts, int, prefix, stream_size) + TECA_POPTS_SET(opts, long long, prefix, poll_interval) + TECA_POPTS_SET(opts, int, prefix, threads_per_device) + TECA_POPTS_SET(opts, int, prefix, ranks_per_device) + TECA_POPTS_SET(opts, int, prefix, propagate_device_assignment) + // force update the the thread pool settings std::string opt_name = (prefix.empty()?"":prefix+"::") + "thread_pool_size"; if (opts.count(opt_name)) this->set_thread_pool_size(opts[opt_name].as()); @@ -155,7 +174,8 @@ void teca_threaded_algorithm::set_thread_pool_size(int n) { TECA_PROFILE_METHOD(128, this, "set_thread_pool_size", this->internals->thread_pool_resize(this->get_communicator(), - n, this->threads_per_device, this->bind_threads, this->verbose); + n, this->threads_per_device, this->ranks_per_device, + this->bind_threads, this->verbose); ) } @@ -239,10 +259,12 @@ const_p_teca_dataset teca_threaded_algorithm::request_data( if (!up_reqs[i].empty()) { teca_algorithm_output_port &up_port - = alg->get_input_connection(i%n_inputs); + = alg->get_input_connection( i % n_inputs ); - teca_data_request dreq(get_algorithm(up_port), up_port, up_reqs[i]); - teca_data_request_task task(dreq); + teca_data_request_task task( + teca_data_request(get_algorithm(up_port), up_port, + up_reqs[i], this->propagate_device_assignment) + ); this->internals->thread_pool->push_task(task); } diff --git a/core/teca_threaded_algorithm.h b/core/teca_threaded_algorithm.h index 86de18d8a..6c3048753 100644 --- a/core/teca_threaded_algorithm.h +++ b/core/teca_threaded_algorithm.h @@ -40,17 +40,20 @@ using teca_data_request_queue = using p_teca_data_request_queue = std::shared_ptr; /** Allocate and initialize a new thread pool. + * * @param comm[in] The communicator to allocate thread across * @param n_threads[in] The number of threads to create per MPI rank. Use -1 to * map one thread per physical core on each node. - * @param n_threads_per_device[in] The number of threads to assign to servicing - * each CUDA device. -1 for all threads. + * @param threads_per_device[in] The number of threads to assign to servicing + * each GPU/device. + * @param ranks_per_device[in] The number of ranks allowed to access each GPU/device. * @param bind[in] If set then thread will be bound to a specific core. * @param verbose[in] If set then the mapping is sent to the stderr */ TECA_EXPORT p_teca_data_request_queue new_teca_data_request_queue(MPI_Comm comm, - int n_threads, int n_threads_per_device, bool bind, bool verbose); + int n_threads, int threads_per_device, int ranks_per_device, bool bind, + bool verbose); /// This is the base class defining a threaded algorithm. /** The strategy employed is to parallelize over upstream data requests using a @@ -85,13 +88,6 @@ class TECA_EXPORT teca_threaded_algorithm : public teca_algorithm /// Get the number of threads in the pool. unsigned int get_thread_pool_size() const noexcept; - /** @name verbose - * set/get the verbosity level. - */ - ///@{ - TECA_ALGORITHM_PROPERTY(int, verbose) - ///@} - /** @name bind_threads * set/get thread affinity mode. When 0 threads are not bound CPU cores, * allowing for migration among all cores. This will likely degrade @@ -118,16 +114,33 @@ class TECA_EXPORT teca_threaded_algorithm : public teca_algorithm TECA_ALGORITHM_PROPERTY(long long, poll_interval) ///@} - /** @name threads_per_cuda_device - * set the number of threads to service each CUDA device. + /** @name threads_per_device + * Set the number of threads to service each GPU/device. Other threads will + * use the CPU. */ ///@{ TECA_ALGORITHM_PROPERTY(int, threads_per_device) ///@} + /** @name ranks_per_device + * Set the number of ranks that have access to each GPU/device. Other ranks + * will use the CPU. + */ + ///@{ + TECA_ALGORITHM_PROPERTY(int, ranks_per_device) + ///@} + /// explicitly set the thread pool to submit requests to void set_data_request_queue(const p_teca_data_request_queue &queue); + /** @name propagate_device_assignment + * When set device assignment is taken from down stream request. + * Otherwise the thread executing the pipeline will provide the assignment. + */ + ///@{ + TECA_ALGORITHM_PROPERTY(int, propagate_device_assignment) + ///@} + protected: teca_threaded_algorithm(); @@ -156,6 +169,8 @@ class TECA_EXPORT teca_threaded_algorithm : public teca_algorithm int stream_size; long long poll_interval; int threads_per_device; + int ranks_per_device; + int propagate_device_assignment; teca_threaded_algorithm_internals *internals; }; diff --git a/io/teca_cf_writer.cxx b/io/teca_cf_writer.cxx index e5f3878b9..3141a0ba3 100644 --- a/io/teca_cf_writer.cxx +++ b/io/teca_cf_writer.cxx @@ -147,7 +147,7 @@ void teca_cf_writer::get_properties_description( "the list of non-geometric arrays to write") ; - this->teca_algorithm::get_properties_description(prefix, opts); + this->teca_threaded_algorithm::get_properties_description(prefix, opts); global_opts.add(opts); } diff --git a/python/teca_py_core.i b/python/teca_py_core.i index d782c9ac6..dd87bea81 100644 --- a/python/teca_py_core.i +++ b/python/teca_py_core.i @@ -1105,8 +1105,8 @@ struct thread_util // each thread. static PyObject *thread_parameters(MPI_Comm comm, - int n_requested, int bind, int n_per_device, - int verbose) + int n_requested, int bind, int threads_per_device, + int ranks_per_device, int verbose) { teca_py_gil_state gil; @@ -1114,8 +1114,8 @@ PyObject *thread_parameters(MPI_Comm comm, std::vector device_ids; int n_threads = n_requested; if (teca_thread_util::thread_parameters(comm, -1, - n_requested, bind, n_per_device, verbose, n_threads, - affinity, device_ids)) + n_requested, bind, threads_per_device, ranks_per_device, + verbose, n_threads, affinity, device_ids)) { // caller requested automatic load balancing but this, failed. PyErr_Format(PyExc_RuntimeError, diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index ea9d8d35f..4556d3dee 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -1019,10 +1019,19 @@ teca_add_test(test_ha4_connected_components_periodic COMMAND test_ha4_connected_components 256 256 14 1 1024 1 FEATURES ${TECA_HAS_CUDA}) +teca_add_test(test_cpp_temporal_reduction_inmem + SOURCES test_cpp_temporal_reduction_inmem.cpp + LIBS teca_core teca_data teca_io teca_alg ${teca_test_link} + COMMAND test_cpp_temporal_reduction_inmem + 360 180 5 24 1 ${TEST_CORES} ${TEST_CORES} monthly average 1 #TODO -- more than 1 step per req + test_cpp_temporal_reduction_inmem_%t%.nc yearly ${TEST_CORES} 1 1 1) -teca_add_test(test_cpp_temporal_reduction - EXEC_NAME test_temporal_reduction - SOURCES test_temporal_reduction.cpp +teca_add_test(test_cpp_temporal_reduction_io + SOURCES test_cpp_temporal_reduction_io.cpp LIBS teca_core teca_data teca_io teca_alg ${teca_test_link} - COMMAND test_temporal_reduction - 360 180 5 24 1 1 1 monthly average test_cpp_temporal_reduction_%t%.nc yearly 1 1 1 1) + COMMAND test_cpp_temporal_reduction_io + ${TECA_DATA_ROOT}/prw_hus_day_MRI-CGCM3_historical_r1i1p1_19500101-19501231\.nc + lon lat . time prw 0 -1 ${TEST_CORES} ${TEST_CORES} ${TEST_CORES} 4 1 0 + monthly average 1 # TODO -- more than 1 step per req. + ./test_cpp_temporal_reduction_io_%t%.nc yearly + ${TEST_CORES} ${TEST_CORES} ${TEST_CORES} 1 1) diff --git a/test/python/test_thread_parameters.py b/test/python/test_thread_parameters.py index b665830de..c2497f227 100644 --- a/test/python/test_thread_parameters.py +++ b/test/python/test_thread_parameters.py @@ -1,7 +1,7 @@ from teca import * from mpi4py.MPI import COMM_SELF -n,affin,devs = thread_util.thread_parameters(COMM_SELF, -1, 1, -1, 1) +n,affin,devs = thread_util.thread_parameters(COMM_SELF, -1, 1, -1, -1, 1) print('num_threads = %d'%(n)) print('affinity = %s'%(str(affin))) diff --git a/test/python/test_thread_parameters_mpi.py b/test/python/test_thread_parameters_mpi.py index 4e0adad84..10e5c37b3 100644 --- a/test/python/test_thread_parameters_mpi.py +++ b/test/python/test_thread_parameters_mpi.py @@ -22,7 +22,7 @@ sys.stderr.write('\n\nTesting automatic load balancing w/ affiniity map\n') try: n_threads, affinity, device_ids = \ - thread_util.thread_parameters(MPI.COMM_WORLD, -1, 1, -1, verbose) + thread_util.thread_parameters(MPI.COMM_WORLD, -1, 1, -1, -1, verbose) except(RuntimeError): sys.stderr.write('Failed to determine threading parameters\n') for i in range(n_ranks): @@ -39,7 +39,7 @@ sys.stderr.write('\n\nTesting automatic load balancing w/o affiniity map\n') try: n_threads, affinity, device_ids = \ - thread_util.thread_parameters(MPI.COMM_WORLD, -1, 0, -1, 0) + thread_util.thread_parameters(MPI.COMM_WORLD, -1, 0, -1, -1, 0) except(RuntimeError): sys.stderr.write('Failed to determine threading parameters\n') for i in range(n_ranks): @@ -56,7 +56,7 @@ sys.stderr.write('\n\nTesting explcit load balancing (%d threads per rank) w/ affiniity map\n'%(n_explicit)) try: n_threads, affinity, device_ids = \ - thread_util.thread_parameters(MPI.COMM_WORLD, n_explicit, 1, -1, verbose) + thread_util.thread_parameters(MPI.COMM_WORLD, n_explicit, 1, -1, -1, verbose) except(RuntimeError): sys.stderr.write('Failed to determine threading parameters\n') for i in range(n_ranks): @@ -73,7 +73,7 @@ sys.stderr.write('\n\nTesting explcit load balancing (%d threads per rank) w/o affiniity map\n'%(n_explicit)) try: n_threads, affinity, device_ids = \ - thread_util.thread_parameters(MPI.COMM_WORLD, n_explicit, 0, -1, 0) + thread_util.thread_parameters(MPI.COMM_WORLD, n_explicit, 0, -1, -1, 0) except(RuntimeError): sys.stderr.write('Failed to determine threading parameters\n') for i in range(n_ranks): diff --git a/test/test_temporal_reduction.cpp b/test/test_cpp_temporal_reduction_inmem.cpp similarity index 96% rename from test/test_temporal_reduction.cpp rename to test/test_cpp_temporal_reduction_inmem.cpp index 12af44e9c..99609b153 100644 --- a/test/test_temporal_reduction.cpp +++ b/test/test_cpp_temporal_reduction_inmem.cpp @@ -145,7 +145,7 @@ generate(size_t nx, size_t ny, size_t nz, size_t nt, size_t nxyzt = nxyz*nt; double *pf_xyzt; - std::tie(f_xyzt, pf_xyzt) = ::New(nxyzt, 0.0, allocator::malloc); + std::tie(f_xyzt, pf_xyzt) = ::New(nxyzt, allocator::malloc); double pi = 3.14159235658979; @@ -365,12 +365,12 @@ int main(int argc, char **argv) teca_system_interface::set_stack_trace_on_error(); teca_system_interface::set_stack_trace_on_mpi_error(); - if (argc != 16) + if (argc != 17) { std::cerr << "test_temporal_reduction [nx in] [ny in] [nz in]" " [steps per day] [num years]" " [num reduction threads] [threads per device]" - " [reduction interval] [reduction operator]" + " [reduction interval] [reduction operator] [steps per request]" " [out file] [file layout] [num writer threads]" " [nx out] [ny out] [nz out]" << std::endl; return -1; @@ -389,14 +389,15 @@ int main(int argc, char **argv) int threads_per_dev = atoi(argv[7]); std::string red_int = argv[8]; std::string red_op = argv[9]; + int steps_per_req = atoi(argv[10]); - std::string ofile_name = argv[10]; - std::string layout = argv[11]; - int n_wri_threads = atoi(argv[12]); + std::string ofile_name = argv[11]; + std::string layout = argv[12]; + int n_wri_threads = atoi(argv[13]); - unsigned long nx_out = atoi(argv[13]); - unsigned long ny_out = atoi(argv[14]); - unsigned long nz_out = atoi(argv[15]); + unsigned long nx_out = atoi(argv[14]); + unsigned long ny_out = atoi(argv[15]); + unsigned long nz_out = atoi(argv[16]); double T0 = 360.0; // period of 1 year double T1 = 1.0; // period of 1 day @@ -413,6 +414,7 @@ int main(int argc, char **argv) << " nt=" << nt << " reduce_threads=" << n_red_threads << " threads_per_dev=" << threads_per_dev << " red_int=" << red_int << " red_op=" << red_op + << " steps_per_req=" << steps_per_req << " layout=" << layout << " writer_threads=" << n_wri_threads << " nx_out=" << nx_out << " ny_out=" << ny_out << " nz_out=" << nz_out << std::endl; @@ -439,7 +441,7 @@ int main(int argc, char **argv) reduc->set_interval(red_int); reduc->set_operation(red_op); reduc->set_point_arrays({"f_t"}); - + reduc->set_steps_per_request(steps_per_req); // low res output mesh auto md = reduc->update_metadata(); diff --git a/test/test_cpp_temporal_reduction_io.cpp b/test/test_cpp_temporal_reduction_io.cpp new file mode 100644 index 000000000..9334c21d7 --- /dev/null +++ b/test/test_cpp_temporal_reduction_io.cpp @@ -0,0 +1,136 @@ +#include "teca_config.h" +#include "teca_metadata.h" +#include "teca_cf_reader.h" +#include "teca_temporal_reduction.h" +#include "teca_cf_writer.h" +#include "teca_mpi_manager.h" +#include "teca_system_interface.h" + +#include +#include +#include +#include +#include +#include + +using microseconds_t = std::chrono::duration; + +int main(int argc, char **argv) +{ + auto t0 = std::chrono::high_resolution_clock::now(); + + teca_mpi_manager mpi_man(argc, argv); + int rank = mpi_man.get_comm_rank(); + int n_ranks = mpi_man.get_comm_size(); + + teca_system_interface::set_stack_trace_on_error(); + teca_system_interface::set_stack_trace_on_mpi_error(); + + if (argc != 25) + { + if (rank == 0) + { + std::cerr << "test_temporal_reduction [files regex]" + " [x axis var] [y axis var] [z axis var] [t axis var]" + " [red var] [first step] [last step]" + " [red threads] [red threads per dev] [red ranks per dev] [red stream size] [red bind threads] [red prop dev]" + " [reduction interval] [reduction operator] [steps per request]" + " [out file] [file layout]" + " [wri threads] [wri threads per dev] [wri ranks per dev] [wri stream size] [wri bind threads]" + << std::endl; + } + return -1; + } + + std::string files_regex = argv[1]; + std::string x_axis_var = argv[2]; + std::string y_axis_var = argv[3]; + std::string z_axis_var = argv[4]; + std::string t_axis_var = argv[5]; + std::string red_var = argv[6]; + int first_step = atoi(argv[7]); + int last_step = atoi(argv[8]); + + int n_red_threads = atoi(argv[9]); + int red_threads_per_dev = atoi(argv[10]); + int red_ranks_per_dev = atoi(argv[11]); + int red_stream_size = atoi(argv[12]); + int red_bind_threads = atoi(argv[13]); + int red_prop_dev_id = atoi(argv[14]); + + std::string red_int = argv[15]; + std::string red_op = argv[16]; + int steps_per_req = atoi(argv[17]); + + std::string ofile_name = argv[18]; + std::string layout = argv[19]; + int n_wri_threads = atoi(argv[20]); + int wri_threads_per_dev = atoi(argv[21]); + int wri_ranks_per_dev = atoi(argv[22]); + int wri_stream_size = atoi(argv[23]); + int wri_bind_threads = atoi(argv[24]); + + + if (rank == 0) + { + std::cerr << "n_ranks=" << n_ranks + << " n_red_threads=" << n_red_threads << " red_threads_per_dev=" << red_threads_per_dev + << " red_ranks_per_dev=" << red_ranks_per_dev << "red_stream_size=" << red_stream_size + << " red_bind_threads=" << red_bind_threads << " red_pro_dev_id=" << red_prop_dev_id + << " red_int=" << red_int << " red_op=" << red_op << " steps_per_req=" << steps_per_req + << " layout=" << layout << " n_wri_threads=" << n_wri_threads + << " wri_threads_per_dev=" << wri_threads_per_dev + << " wri_ranks_per_dev=" << wri_ranks_per_dev << " wri_stream_size=" << wri_stream_size + << " wri_bind_threads=" << wri_bind_threads + << std::endl; + } + + // reader + auto cf_reader = teca_cf_reader::New(); + cf_reader->set_x_axis_variable(x_axis_var); + cf_reader->set_y_axis_variable(y_axis_var); + cf_reader->set_z_axis_variable(z_axis_var == "." ? std::string() : z_axis_var); + cf_reader->set_t_axis_variable(t_axis_var); + cf_reader->set_files_regex(files_regex); + + // temporal reduction + auto reduc = teca_cpp_temporal_reduction::New(); + reduc->set_input_connection(cf_reader->get_output_port()); + reduc->set_verbose(1); + reduc->set_threads_per_device(red_threads_per_dev); + reduc->set_ranks_per_device(red_ranks_per_dev); + reduc->set_bind_threads(red_bind_threads); + reduc->set_stream_size(red_stream_size); + reduc->set_propagate_device_assignment(red_prop_dev_id); + reduc->set_thread_pool_size(n_red_threads); + reduc->set_interval(red_int); + reduc->set_operation(red_op); + reduc->set_point_arrays({red_var}); + reduc->set_steps_per_request(steps_per_req); + + // writer + auto cfw = teca_cf_writer::New(); + cfw->set_input_connection(reduc->get_output_port()); + cfw->set_verbose(1); + cfw->set_threads_per_device(wri_threads_per_dev); + cfw->set_ranks_per_device(wri_ranks_per_dev); + cfw->set_stream_size(wri_stream_size); + cfw->set_bind_threads(wri_bind_threads); + cfw->set_thread_pool_size(n_wri_threads); + cfw->set_file_name(ofile_name); + cfw->set_layout(layout); + cfw->set_point_arrays({red_var}); + cfw->set_first_step(first_step); + cfw->set_last_step(last_step); + + cfw->update(); + + if (rank == 0) + { + auto t1 = std::chrono::high_resolution_clock::now(); + microseconds_t dt(t1 - t0); + std::cerr << "total runtime : " << (dt.count() / 1e6) << std::endl; + } + + return 0; +}