Skip to content

Commit

Permalink
Iterate flushing twice
Browse files Browse the repository at this point in the history
  • Loading branch information
lukemartinlogan committed Dec 14, 2023
1 parent a18b1c9 commit 37dfeb9
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 21 deletions.
25 changes: 10 additions & 15 deletions hrun/include/hrun/work_orchestrator/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -383,10 +383,19 @@ class Worker {
EndTask(lane, exec, task, off);
continue;
}
// Attempt to run the task if it's ready and runnable
// Get task properties
bool is_remote = task->domain_id_.IsRemote(HRUN_RPC->GetNumHosts(), HRUN_CLIENT->node_id_);
bool group_avail = CheckTaskGroup(task, exec, work_entry.lane_id_, task->task_node_, is_remote);
bool should_run = task->ShouldRun(work_entry.cur_time_, flushing);
// Verify tasks
if (flushing && !task->IsFlush()) {
if (task->IsLongRunning()) {
exec->Monitor(MonitorMode::kFlushStat, task, rctx);
} else {
flush_.count_ += 1;
}
}
// Attempt to run the task if it's ready and runnable
if (!task->IsRunDisabled() && group_avail && should_run) {
// #define REMOTE_DEBUG
#ifdef REMOTE_DEBUG
Expand Down Expand Up @@ -435,20 +444,6 @@ class Worker {
}
task->DidRun(work_entry.cur_time_);
}
// Verify tasks
if (flushing && !task->IsFlush()) {
// int pend_prior = flush_.count_;
if (task->IsLongRunning()) {
exec->Monitor(MonitorMode::kFlushStat, task, rctx);
} else {
flush_.count_ += 1;
}
// if (pend_prior != flush_.count_) {
// HILOG(kDebug, "(node {}) Pending on task={} state={} method={} is_remote={} worker={}",
// HRUN_CLIENT->node_id_, task->task_node_, task->task_state_, task->method_,
// is_remote, id_)
// }
}
// Cleanup on task completion
if (task->IsModuleComplete()) {
// HILOG(kDebug, "(node {}) Ending task: task_node={} task_state={} worker={}",
Expand Down
15 changes: 9 additions & 6 deletions hrun/tasks_required/hrun_admin/src/hrun_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,16 @@ class Server : public TaskLib {
while (true) {
// Make all workers flush locally
int count = 0;
for (std::unique_ptr<Worker> &worker : HRUN_WORK_ORCHESTRATOR->workers_) {
worker->flush_.count_ = 0;
worker->flush_.flushing_ = true;
while (worker->flush_.flushing_) {
task->Yield<TASK_YIELD_CO>();
for (int i = 0; i < 2; ++i) {
for (std::unique_ptr<Worker>
&worker : HRUN_WORK_ORCHESTRATOR->workers_) {
worker->flush_.count_ = 0;
worker->flush_.flushing_ = true;
while (worker->flush_.flushing_) {
task->Yield<TASK_YIELD_CO>();
}
count += worker->flush_.count_;
}
count += worker->flush_.count_;
}
if (!count) {
break;
Expand Down

0 comments on commit 37dfeb9

Please sign in to comment.