Skip to content

Commit

Permalink
Rewrite LoadBalancerMaster to improve parallel scaling
Browse files Browse the repository at this point in the history
* Directly send new job upon receiving previous result
* Increase queue length limit (maybe remove completely?)
* Try freeing sendbuffers less frequently
* Document
* Remove sched_yield
* Replace magic values
  • Loading branch information
dweindl committed Jan 18, 2020
1 parent 89bdbf7 commit c3050a7
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 70 deletions.
113 changes: 94 additions & 19 deletions include/parpeloadbalancer/loadBalancerMaster.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

namespace parpe {

/** data to be sent to workers */
/** Data to be sent to workers */
struct JobData {
JobData() = default;

Expand All @@ -35,21 +35,24 @@ struct JobData {
/** data to receive (set when job finished) */
std::vector<char> recvBuffer;

/** incremented by one, once the results have been received */
/** incremented by one, once the results have been received (if set) */
int *jobDone = nullptr;

/** is signaled after jobDone has been incremented */
/** is signaled after jobDone has been incremented (if set) */
pthread_cond_t *jobDoneChangedCondition = nullptr;
/** is locked to signal jobDoneChangedCondition condition */
/** is locked to signal jobDoneChangedCondition condition (if set) */
pthread_mutex_t *jobDoneChangedMutex = nullptr;

/** callback when job is finished */
/** callback when job is finished (if set) */
std::function<void(JobData*)> callbackJobFinished = nullptr;
};


#ifdef PARPE_ENABLE_MPI

/**
* @brief The LoadBalancerMaster class sends jobs to workers, receives the
* results and signals the client.
*/
class LoadBalancerMaster {
public:
/**
Expand All @@ -66,78 +69,150 @@ class LoadBalancerMaster {

/**
* @brief Assign job ID and append to queue for sending to workers.
* @param data
* @param data Data to be sent (user keeps ownership).
*/

void queueJob(JobData *data);

/**
* @brief Stop the loadbalancer thread
*/
void terminate();

/**
* @brief Send termination signal to all workers and wait for receive.
*/
void sendTerminationSignalToAllWorkers();

/**
* @brief Retuns whether we are ready to accept jobs (`run` was called, but
* `terminate` was not).
* @return true if running, false otherwise
*/
bool isRunning() const;

/**
* @brief Get number of jobs in queue
* @return Number of jobs in queue waiting to be sent
*/
int getNumQueuedJobs() const;

private:
/**
* @brief Thread entry point. This is run from run()
* @param "this"
* @param `this`
* @return 0, always
*/
static void *threadEntryPoint(void *vpLoadBalancerMaster);

/**
* @brief Main function of the load balancer thread.
*
* Called from threadEntryPoint.
*
* @return nullptr
*/
void *loadBalancerThreadRun();

/**
* @brief Frees all send buffers after respective MPI messages have been
* sent
*/
void freeEmptiedSendBuffers();

/**
* @brief Check for finished jobs, receive their results and send next job
* if jobs are waiting.
* @return Index (not rank) of worker that has finished and not yet received
* new work or NO_FREE_WORKER if no such worker.
*/
int handleFinishedJobs();

/**
* @brief Get index (not rank) of next free worker.
* @return That index or NO_FREE_WORKER if no such worker
*/
int getNextFreeWorkerIndex();

/**
* @brief getNextJob Pop oldest element from the queue and return.
* @brief Pop oldest element from the queue and return.
* @return The first queue element.
*/
JobData *getNextJob();

/**
* @brief sendToWorker Send the given work package to the given worker and
* track requests
* @param workerIdx
* @param queueElement
* @brief Send the given work package to the given worker and track
* requests
* @param workerIdx Index (not rank)
* @param data Job data to send
*/
void sendToWorker(int workerIdx, JobData *data);

/**
* @brief receiveFinished Message received from worker, mark job as done.
* @param workerID
* @param jobID
* @brief Handle the result message from a worker as indicated by mpiStatus.
*
* @param mpiStatus Receive the indicated message, mark job as done,
* signal reception.
*/
int handleReply(MPI_Status *mpiStatus);

/**
* @brief Check if jobs are waiting in queue and send to specified worker.
*
* Non-blocking.
*
* @param freeWorkerIndex Worker index to send job to. Assumed to be free.
* @return Whether a job has been sent
*/
bool sendQueuedJob(int freeWorkerIndex);

/** MPI communicator we are working on */
MPI_Comm mpiComm = MPI_COMM_WORLD;

/** MPI data type for job and result packages */
MPI_Datatype mpiJobDataType = MPI_BYTE;

/** Indicates whether we are ready to handle jobs */
bool isRunning_ = false;

/** Number of workers we can send jobs to */
int numWorkers = 0;

/** Queue with jobs to be sent to workers */
std::queue<JobData *> queue;

/** Last assigned job ID used as MPI message tag */
int lastJobId = 0;

// one for each worker, index is off by one from MPI rank
// because no job is sent to master (rank 0)
/** Keeps track of whether the respective worker is busy (received a job,
* but has not sent the result) or is ready to accept a new job.
*
* Length is `numWorkers`. Index is off by one from MPI rank because no job
* is sent to master (rank 0).
*/
std::vector<bool> workerIsBusy;

/** MPI requests for jobs sent asynchronously to workers. Used to track when
* the respective send buffers can be freed. */
std::vector<MPI_Request> sendRequests;

/** Jobs that have been sent to workers. Required for handling replies and
* signalling the client that processing has completed. */
std::vector<JobData *> sentJobsData;

/** Mutex to protect access to `queue`. */
pthread_mutex_t mutexQueue = PTHREAD_MUTEX_INITIALIZER;

/** Semaphore to limit queue length and avoid potentially huge memory
* allocation for all send and receive buffers. Note that using this might
* come with a decreasing performance due to frequent rescheduling
*/
sem_t semQueue = {};

/** Thread that runs the message dispatcher. */
pthread_t queueThread = 0;

/** Value to indicate that there is currently no known free worker. */
constexpr static int NO_FREE_WORKER = -1;
};

#endif
Expand Down
97 changes: 46 additions & 51 deletions src/parpeloadbalancer/loadBalancerMaster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ void LoadBalancerMaster::run() {
#endif

int mpiCommSize;
MPI_Comm_size(MPI_COMM_WORLD, &mpiCommSize);
MPI_Comm_size(mpiComm, &mpiCommSize);
assert(mpiCommSize > 1 &&
"Need multiple MPI processes!"); // crashes otherwise

Expand All @@ -32,13 +32,11 @@ void LoadBalancerMaster::run() {
// have to initialize before can wait!
sendRequests.resize(numWorkers, MPI_REQUEST_NULL);

/* Create semaphore to limit queue length
* and avoid huge memory allocation for all send and receive buffers.
*/
unsigned int queueMaxLength = mpiCommSize;
// Create semaphore to limit queue length
#ifdef SEM_VALUE_MAX
if(SEM_VALUE_MAX < queueMaxLength)
throw ParPEException("SEM_VALUE_MAX too small to work with the given MPI_Comm_size.");
unsigned int queueMaxLength = SEM_VALUE_MAX;
#else
unsigned int queueMaxLength = UINT_MAX;
#endif
sem_init(&semQueue, 0, queueMaxLength);

Expand Down Expand Up @@ -71,40 +69,16 @@ void *LoadBalancerMaster::loadBalancerThreadRun() {

// dispatch queued work packages
while (true) {
int freeWorkerIndex = NO_FREE_WORKER;

// empty send queue while there are free workers
while((freeWorkerIndex = getNextFreeWorkerIndex()) >= 0
&& sendQueuedJob(freeWorkerIndex)) {}

// check if any job finished
int lastFinishedWorkerIdx = handleFinishedJobs();
handleFinishedJobs();

freeEmptiedSendBuffers();

// getNextFreeWorker
int freeWorkerIndex = lastFinishedWorkerIdx;

if (freeWorkerIndex < 0) {
// no job finished recently, check free slots
freeWorkerIndex = getNextFreeWorkerIndex();
}

if (freeWorkerIndex < 0) {
// add cancellation point to avoid invalid reads in
// loadBalancer.recvRequests
pthread_testcancel();

// all workers are busy, wait for next one to finish
MPI_Status status;
MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
freeWorkerIndex = handleReply(&status);
}

// found free worker, check for jobs to do
JobData *currentQueueElement = getNextJob();

if (currentQueueElement) {
sendToWorker(freeWorkerIndex, currentQueueElement);
sentJobsData[freeWorkerIndex] = currentQueueElement;
}

sched_yield();
};

return nullptr;
Expand All @@ -113,7 +87,7 @@ void *LoadBalancerMaster::loadBalancerThreadRun() {
void LoadBalancerMaster::freeEmptiedSendBuffers() {
// free any emptied send buffers
while (true) {
int emptiedBufferIdx = -1;
int emptiedBufferIdx = MPI_UNDEFINED;
int anySendCompleted = 0;
MPI_Testany(sendRequests.size(), sendRequests.data(), &emptiedBufferIdx,
&anySendCompleted, MPI_STATUS_IGNORE);
Expand All @@ -131,22 +105,27 @@ void LoadBalancerMaster::freeEmptiedSendBuffers() {
}

int LoadBalancerMaster::handleFinishedJobs() {
MPI_Status status;
int finishedWorkerIdx = -1;
int finishedWorkerIdx = NO_FREE_WORKER;

// handle all finished jobs, if any
while (true) {
// add cancellation point to avoid invalid reads in
// loadBalancer.recvRequests
pthread_testcancel();

// check for waiting incoming message
MPI_Status status;
int messageWaiting = 0;
MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &messageWaiting,
&status);
MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, mpiComm,
&messageWaiting, &status);

if (messageWaiting) {
// some job is finished
// some job is finished, process that
finishedWorkerIdx = handleReply(&status);

// directly send new work if available
if(sendQueuedJob(finishedWorkerIdx))
finishedWorkerIdx = NO_FREE_WORKER; // not free anymore
} else {
// there was nothing to be finished
break;
Expand All @@ -161,7 +140,7 @@ int LoadBalancerMaster::getNextFreeWorkerIndex() {
return i;
}

return -1;
return NO_FREE_WORKER;
}

JobData *LoadBalancerMaster::getNextJob() {
Expand Down Expand Up @@ -192,8 +171,9 @@ void LoadBalancerMaster::sendToWorker(int workerIdx, JobData *data) {
printf("\x1b[31mSending job #%d to rank %d (%luB).\x1b[0m\n", tag, workerRank, data->sendBuffer.size());
#endif

MPI_Isend(data->sendBuffer.data(), data->sendBuffer.size(), MPI_BYTE, workerRank, tag,
MPI_COMM_WORLD, &sendRequests[workerIdx]);
MPI_Isend(data->sendBuffer.data(), data->sendBuffer.size(), mpiJobDataType,
workerRank, tag,
mpiComm, &sendRequests[workerIdx]);

sem_post(&semQueue);
}
Expand Down Expand Up @@ -246,7 +226,7 @@ int LoadBalancerMaster::handleReply(MPI_Status *mpiStatus) {

// allocate memory for result
int lenRecvBuffer = 0;
MPI_Get_count(mpiStatus, MPI_BYTE, &lenRecvBuffer);
MPI_Get_count(mpiStatus, mpiJobDataType, &lenRecvBuffer);
data->recvBuffer.resize(lenRecvBuffer);

#ifdef MASTER_QUEUE_H_SHOW_COMMUNICATION
Expand All @@ -255,8 +235,8 @@ int LoadBalancerMaster::handleReply(MPI_Status *mpiStatus) {
#endif

// receive
MPI_Recv(data->recvBuffer.data(), data->recvBuffer.size(), MPI_BYTE,
mpiStatus->MPI_SOURCE, mpiStatus->MPI_TAG, MPI_COMM_WORLD,
MPI_Recv(data->recvBuffer.data(), data->recvBuffer.size(), mpiJobDataType,
mpiStatus->MPI_SOURCE, mpiStatus->MPI_TAG, mpiComm,
MPI_STATUS_IGNORE);

workerIsBusy[workerIdx] = false;
Expand All @@ -281,15 +261,30 @@ int LoadBalancerMaster::handleReply(MPI_Status *mpiStatus) {
return workerIdx;
}

bool LoadBalancerMaster::sendQueuedJob(int freeWorkerIndex)
{
if (freeWorkerIndex < 0)
return false;

JobData *currentQueueElement = getNextJob();

if (currentQueueElement) {
sendToWorker(freeWorkerIndex, currentQueueElement);
sentJobsData[freeWorkerIndex] = currentQueueElement;
return true;
}
return false;
}

void LoadBalancerMaster::sendTerminationSignalToAllWorkers() {
int commSize;
MPI_Comm_size(MPI_COMM_WORLD, &commSize);
MPI_Comm_size(mpiComm, &commSize);

MPI_Request reqs[commSize - 1];

for (int i = 1; i < commSize; ++i) {
reqs[i - 1] = MPI_REQUEST_NULL;
MPI_Isend(MPI_BOTTOM, 0, MPI_INT, i, 0, MPI_COMM_WORLD, &reqs[i - 1]);
MPI_Isend(MPI_BOTTOM, 0, MPI_INT, i, 0, mpiComm, &reqs[i - 1]);
}
MPI_Waitall(commSize - 1, reqs, MPI_STATUS_IGNORE);
}
Expand Down

0 comments on commit c3050a7

Please sign in to comment.