Skip to content

Commit

Permalink
Add option to specify maximum number of threads for multi-start optim…
Browse files Browse the repository at this point in the history
…ization (#361)

So far, this was equal to the number of local optimizations, but that wouldn't scale well.

This decouples the number parallel local optimizations from the total number of optimizations.

This is controlled by setting environment variable `PARPE_NUM_PARALLEL_STARTS` to the number
of optimizations that should be run in parallel.

* Now requires Boost.Thread library
* Update dependencies
* Remove obsolete functions
  * getLocalOptimumThreadWrapper
  * MultiStartOptimization::createLocalOptimizationProblems

Closes #359, closes #84
  • Loading branch information
dweindl authored Nov 5, 2021
1 parent 82b2ba9 commit 9559ce3
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 131 deletions.
20 changes: 16 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ For full functionality, parPE requires the following libraries:
* IPOPT (>= 1.2.7) (requires coinhsl)
* CERES (>=1.13)
([requires Eigen](http://ceres-solver.org/installation.html#dependencies))
* [Boost](https://www.boost.org/) (serialization, thread)
* HDF5 (>= 1.10)
* CBLAS compatible BLAS (libcblas, Intel MKL, ...)
* [AMICI](https://github.com/ICB-DCM/AMICI) (included in this repository)
Expand All @@ -64,10 +65,21 @@ For full functionality, parPE requires the following libraries:

On Debian-based systems, dependencies can be installed via:
```shell
sudo apt-get install build-essential cmake cmake-curses-gui \
coinor-libipopt-dev curl gfortran \
libblas-dev libboost-serialization-dev libceres-dev \
libmpich-dev libhdf5-dev libpython3-dev python3-pip
sudo apt-get install \
build-essential \
cmake \
cmake-curses-gui \
coinor-libipopt-dev \
curl \
gfortran \
libblas-dev \
libboost-serialization-dev \
libboost-thread-dev \
libceres-dev \
libmpich-dev \
libhdf5-dev \
libpython3-dev \
python3-pip
```

Scripts to fetch and build the remaining dependencies are provided in
Expand Down
7 changes: 7 additions & 0 deletions doc/optimizationApplication.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ Run the created executable with the `-h`/`--help` argument.

## Environment variables

- **PARPE_NUM_PARALLEL_STARTS**

Setting `PARPE_NUM_PARALLEL_STARTS=n` will create a maximum of `n` threads
for concurrent local optimizations during multi-start optimization.
If unset, this defaults to the number of concurrent threads supported by
hardware.

- **PARPE_LOG_SIMULATIONS**

Setting `PARPE_LOG_SIMULATIONS=1` will cause every single AMICI simulation to be saved in the result files.
Expand Down
7 changes: 5 additions & 2 deletions include/parpeoptimization/multiStartOptimization.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class MultiStartOptimization {
/**
* @brief Run all optimizations in parallel, each in a dedicated thread
*/
void runMultiThreaded();
void runMultiThreaded() const;

/**
* @brief Run optimizations sequentially
Expand All @@ -58,8 +58,11 @@ class MultiStartOptimization {
void setRunParallel(bool runParallel);

private:
/**
* @brief Optimize local problem for the given start index
*/
int runStart(int start_idx) const;

std::vector<OptimizationProblem *> createLocalOptimizationProblems();

/** Optimization problem to be solved */
MultiStartOptimizationProblem& msProblem;
Expand Down
9 changes: 0 additions & 9 deletions include/parpeoptimization/optimizationProblem.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,15 +217,6 @@ class OptimizationProblemImpl: public OptimizationProblem {
int getLocalOptimum(OptimizationProblem *problem);


/**
* @brief getLocalOptimumThreadWrapper wrapper for using getLocalOptimum with
* pThreads.
* @param optimizationProblemVp
* @return Pointer to int indicating status. 0: success, != 0: failure
*/

void *getLocalOptimumThreadWrapper(void *optimizationProblemVp);

void optimizationProblemGradientCheckMultiEps(OptimizationProblem *problem,
int numParameterIndicesToCheck);

Expand Down
2 changes: 1 addition & 1 deletion src/parpeamici/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ else()
# For python module we need -fPIC which is only the case with shared libs
set(Boost_USE_STATIC_LIBS FALSE)
endif()
find_package(Boost COMPONENTS serialization REQUIRED)
find_package(Boost COMPONENTS serialization thread REQUIRED)


project(parpeamici)
Expand Down
194 changes: 86 additions & 108 deletions src/parpeoptimization/multiStartOptimization.cpp
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
#include <boost/asio.hpp>

#include <parpeoptimization/multiStartOptimization.h>
#include <parpecommon/logging.h>
#include <parpecommon/parpeException.h>

#include <cstdlib>
#include <cstring>
#include <pthread.h>
#include <unistd.h>
#include <cassert>
#include <future>

namespace parpe {


MultiStartOptimization::MultiStartOptimization(
MultiStartOptimizationProblem &problem,
bool runParallel,
int first_start_idx)
MultiStartOptimizationProblem &problem,
bool runParallel,
int first_start_idx)
: msProblem(problem),
numberOfStarts(problem.getNumberOfStarts()),
restartOnFailure(problem.restartOnFailure()),
Expand All @@ -31,110 +31,90 @@ void MultiStartOptimization::run() {
runSingleThreaded();
}

void MultiStartOptimization::runMultiThreaded()
void MultiStartOptimization::runMultiThreaded() const
{
logmessage(loglevel::debug,
"Starting runParallelMultiStartOptimization with %d starts",
numberOfStarts);

std::vector<pthread_t> localOptimizationThreads(numberOfStarts);

std::vector<OptimizationProblem *> localProblems =
createLocalOptimizationProblems();

if(localProblems.size() != static_cast<std::vector<OptimizationProblem *>::size_type>(numberOfStarts)) {
throw ParPEException("Number of problems does not match number of specific starts.");
// Determine thread pool size
// (note that hardware_concurrency() may return 0)
auto num_threads = std::max(std::thread::hardware_concurrency(), 1U);
if(auto env = std::getenv("PARPE_NUM_PARALLEL_STARTS")) {
num_threads = std::stoi(env);
}
num_threads = std::min(num_threads,
static_cast<unsigned int>(numberOfStarts));

pthread_attr_t threadAttr;
pthread_attr_init(&threadAttr);
pthread_attr_setdetachstate(&threadAttr, PTHREAD_CREATE_JOINABLE);

int lastStartIdx = -1;
// launch threads for required number of starts
for (int ms = 0; ms < numberOfStarts; ++ms) {
logmessage(loglevel::debug,
"Running %d starts using %d threads",
numberOfStarts, num_threads);

boost::asio::thread_pool pool(num_threads);

auto num_successful_starts = 0;
auto num_finished_starts = 0;
auto lastStartIdx = -1;

// submit the minimum number of starts
std::vector<std::future<std::pair<int, int>>> futures;
futures.reserve(numberOfStarts);
for (int start_idx = 0; start_idx < numberOfStarts; ++start_idx) {
futures.push_back(
boost::asio::post(
pool,
std::packaged_task<std::pair<int, int>()>([this, start_idx] {
return std::make_pair(start_idx, runStart(start_idx));
})));
++lastStartIdx;

logmessage(loglevel::debug,
"Spawning thread for local optimization #%d (%d)",
lastStartIdx, ms);

auto ret = pthread_create(
&localOptimizationThreads.at(ms), &threadAttr,
getLocalOptimumThreadWrapper,
static_cast<void *>(localProblems[ms]));
if(ret) {
throw ParPEException("Failure during thread creation: "
+ std::to_string(ret));
}
}

int numCompleted = 0;
// Report finished runs and restart if necessary
while ((restartOnFailure && num_successful_starts < numberOfStarts)
|| (!restartOnFailure && num_finished_starts < numberOfStarts)) {
for (auto &future: futures) {
// future value might have been retrieved before
if(!future.valid()) {
continue;
}

while (numCompleted < numberOfStarts) {
for (int ms = 0; ms < numberOfStarts; ++ms) {
// problem still running?
if (!localProblems[ms])
if(auto status = future.wait_for(std::chrono::milliseconds(1));
status != std::future_status::ready) {
continue;
}

int *threadStatus = nullptr;
#ifndef __APPLE__
// TODO(#84) pthread_tryjoin_np is not available on macOS. can replace easily by pthread_join, but this would only allow restarting failed threads rather late, so we disable the retry option for now.
int joinStatus = pthread_tryjoin_np(localOptimizationThreads[ms],
reinterpret_cast<void **>(&threadStatus));
#else
int joinStatus = pthread_join(localOptimizationThreads[ms],
reinterpret_cast<void **>(&threadStatus));
#endif
if (joinStatus == 0) { // joined successful
delete localProblems[ms];
localProblems[ms] = nullptr;

if (*threadStatus == 0 || !restartOnFailure) {
if (*threadStatus == 0) {
logmessage(loglevel::debug,
"Thread ms #%d finished successfully", ms);
} else {
logmessage(loglevel::debug, "Thread ms #%d finished "
"unsuccessfully. Not trying "
"new starting point.",
ms);
}
++numCompleted;
}
#ifndef __APPLE__
else {
logmessage(loglevel::warning, "Thread ms #%d finished "
"unsuccessfully... trying new "
"starting point",
ms);
++lastStartIdx;

localProblems[ms] = msProblem.getLocalProblem(lastStartIdx).release();
logmessage(
loglevel::debug,
"Spawning thread for local optimization #%d (%d)",
lastStartIdx, ms);
auto ret = pthread_create(
&localOptimizationThreads[ms], &threadAttr,
getLocalOptimumThreadWrapper,
static_cast<void *>(localProblems[ms]));
if(ret) {
throw ParPEException("Failure during thread creation: "
+ std::to_string(ret));
}
}
#endif
delete threadStatus;
++num_finished_starts;
auto [start_idx, retval] = future.get();

if (retval == 0) {
// success
logmessage(loglevel::debug,
"Optimization #%d finished successfully",
start_idx);
++num_successful_starts;
} else if (!restartOnFailure) {
// failure, no new start
logmessage(loglevel::debug,
"Optimization ms #%d finished "
"unsuccessfully. Not trying "
"new starting point.",
start_idx);
} else {
// failure, new start
logmessage(loglevel::debug,
"Thread ms #%d finished unsuccessfully... "
"trying new starting point", start_idx);
++lastStartIdx;

future = boost::asio::post(
pool,
std::packaged_task<std::pair<int, int>()>(
[this, start_idx=lastStartIdx] {
return std::make_pair(start_idx, runStart(start_idx));
}));
}
}

sleep(1); // TODO: replace by condition via ThreadWrapper
}

logmessage(loglevel::debug, "runParallelMultiStartOptimization finished");
pool.join();

pthread_attr_destroy(&threadAttr);
logmessage(loglevel::debug, "Multi-start optimization finished.");
}

void MultiStartOptimization::runSingleThreaded()
Expand All @@ -153,15 +133,15 @@ void MultiStartOptimization::runSingleThreaded()
if(ms == numberOfStarts)
break;

auto problem = msProblem.getLocalProblem(first_start_idx + ms);
auto result = getLocalOptimum(problem.get());
auto result = runStart(ms);

if(result) {
logmessage(loglevel::debug,
"Start #%d finished successfully", ms);
++numSucceeded;
} else {
logmessage(loglevel::debug, "Thread ms #%d finished "
"unsuccessfully.",ms);
logmessage(loglevel::debug, "Start ms #%d finished "
"unsuccessfully.",ms);
}
++ms;
}
Expand All @@ -174,15 +154,13 @@ void MultiStartOptimization::setRunParallel(bool runParallel)
this->runParallel = runParallel;
}

std::vector<OptimizationProblem *>
MultiStartOptimization::createLocalOptimizationProblems() {
std::vector<OptimizationProblem *> localProblems(numberOfStarts);

for (int ms = 0; ms < numberOfStarts; ++ms) {
localProblems[ms] = msProblem.getLocalProblem(first_start_idx + ms).release();
}
int MultiStartOptimization::runStart(int start_idx) const
{
logmessage(loglevel::debug,
"Starting local optimization #%d", start_idx);

return localProblems;
auto problem = msProblem.getLocalProblem(first_start_idx + start_idx);
return getLocalOptimum(problem.get());
}

} // namespace parpe
7 changes: 0 additions & 7 deletions src/parpeoptimization/optimizationProblem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,6 @@ int getLocalOptimum(OptimizationProblem *problem) {
}


void *getLocalOptimumThreadWrapper(void *optimizationProblemVp) {
auto problem = static_cast<OptimizationProblem *>(optimizationProblemVp);
auto *result = new int;
*result = getLocalOptimum(problem);
return result;
}

void optimizationProblemGradientCheckMultiEps(OptimizationProblem *problem,
int numParameterIndicesToCheck
) {
Expand Down

0 comments on commit 9559ce3

Please sign in to comment.