Skip to content

Commit

Permalink
fix bug in thread pool executor
Browse files Browse the repository at this point in the history
  • Loading branch information
marenz2569 committed Jan 28, 2024
1 parent 53830b8 commit 643fd8b
Showing 1 changed file with 23 additions and 21 deletions.
44 changes: 23 additions & 21 deletions src/streaming_ordered_output_thread_pool_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,13 @@ template <typename ReturnType> void StreamingOrderedOutputThreadPoolExecutor<Ret
for (;;) {
std::optional<std::pair<uint64_t, std::function<ReturnType()>>> work{};

if (stop) {
std::lock_guard lk(cv_input_item_m);
if (inputQueue.size() == 0)
break;
}

{
std::lock_guard lk(cv_input_item_m);
if (!inputQueue.empty()) {
work = inputQueue.front();
inputQueue.pop_front();
}
} else if (stop)
break;
}

if (!work.has_value()) {
Expand Down Expand Up @@ -94,30 +89,37 @@ void StreamingOrderedOutputThreadPoolExecutor<ReturnType>::queueWork(std::functi
}

template <typename ReturnType> ReturnType StreamingOrderedOutputThreadPoolExecutor<ReturnType>::get() {
std::optional<ReturnType> result{};
for (;;) {
std::optional<ReturnType> result{};

{
std::lock_guard<std::mutex> lk(cv_output_item_m);

while (!result.has_value()) {
std::unique_lock<std::mutex> lk(cv_output_item_m);
cv_output_item.wait_for(lk, 10ms, [&] {
// find the output item and if found set outputCounter_ to the next item
if (auto search = outputMap.find(outputCounter); search != outputMap.end()) {
result = search->second;
outputMap.erase(search);
outputCounter++;
return true;
}
}

return false;
});
if (!result.has_value()) {
std::unique_lock<std::mutex> lk(cv_output_item_m);
auto res = cv_output_item.wait_for(lk, 10ms, [&] {
// find the output item and if found set outputCounter_ to the next item
if (auto search = outputMap.find(outputCounter); search != outputMap.end()) {
result = search->second;
outputMap.erase(search);
outputCounter++;
return true;
}

if (auto search = outputMap.find(outputCounter); search != outputMap.end()) {
result = search->second;
outputMap.erase(search);
outputCounter++;
return false;
});
}
}

return *result;
if (result.has_value())
return *result;
}
}

template class StreamingOrderedOutputThreadPoolExecutor<std::vector<std::function<void()>>>;

0 comments on commit 643fd8b

Please sign in to comment.