Skip to content

Commit

Permalink
Bugfix: Ensure waitForPreviousTransfer exits during error
Browse files Browse the repository at this point in the history
Summary:
The condition use previously in waitForPreviousTransfer was incorrect
and could block the thread indefinitely if some of the sender threads
exited due to errors. This diff fixes the behavior and also adds progress
reporting for better visibility.

Differential Revision: D18921721

fbshipit-source-id: 7e21c7f9b958470ab87c51cecb4e9cd5ae2bcaee
  • Loading branch information
sarangbh authored and facebook-github-bot committed Dec 15, 2019
1 parent 0184e65 commit 8a0273c
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 59 deletions.
24 changes: 20 additions & 4 deletions Sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,24 @@ std::unique_ptr<TransferReport> Sender::transfer() {
return finish();
}

folly::Optional<std::vector<WdtFileInfo>>
Sender::getFilesFromFileInfoGenerator() {
auto runningThreads = [this]() {
return threadsController_->numRunningThreads();
};
dirQueue_->waitForPreviousTransfer(
std::chrono::milliseconds(options_.progress_report_interval_millis),
runningThreads);

const auto status = getTransferStatus();
if (status != ONGOING) {
WLOG(INFO) << "Terminating transafer since status isn't ONGOING. "
<< "Status: " << status;
return folly::none;
}
return transferRequest_.fileInfoGenerator();
}

ErrorCode Sender::start() {
{
std::lock_guard<std::mutex> lock(mutex_);
Expand Down Expand Up @@ -297,10 +315,8 @@ ErrorCode Sender::start() {
transferRequest_.disableDirectoryTraversal) {
dirQueue_->setFileInfo(transferRequest_.fileInfo);
if (transferRequest_.fileInfoGenerator) {
dirQueue_->setFileInfoGenerator([this]() {
dirQueue_->waitForPreviousTransfer();
return transferRequest_.fileInfoGenerator();
});
dirQueue_->setFileInfoGenerator(
[this]() { return getFilesFromFileInfoGenerator(); });
}
}
transferHistoryController_ =
Expand Down
8 changes: 7 additions & 1 deletion Sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class Sender : public WdtBase {
private:
friend class SenderThread;
friend class QueueAbortChecker;
FRIEND_TEST(SenderTest, FileInfoGenerator);
friend class SenderTests;

/// Validate the transfer request
ErrorCode validateTransferRequest() override;
Expand Down Expand Up @@ -167,6 +167,12 @@ class Sender : public WdtBase {

void logPerfStats() const override;

/// Get the files from fileInfoGenerator if it's configured in transferRequest
/// Note: This call may block on DirectorySourceQueue::waitForPreviousTransfer
///
/// @return list of files or folly::none indicating no more files
folly::Optional<std::vector<WdtFileInfo>> getFilesFromFileInfoGenerator();

/// Pointer to DirectorySourceQueue which reads the srcDir and the files
std::unique_ptr<DirectorySourceQueue> dirQueue_;
/// Number of active threads, decremented every time a thread is finished
Expand Down
116 changes: 67 additions & 49 deletions test/SenderTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,59 +41,77 @@ void createFile(const std::string& path, size_t size) {

} // namespace

TEST(SenderTest, FileInfoGenerator) {
auto senderDir = createTmpDir();
auto receiverDir = createTmpDir();

// create 10 files
std::vector<WdtFileInfo> fileInfo;
std::vector<size_t> cumulativeSize;
cumulativeSize.push_back(0);
const size_t numFiles = 3;
const uint32_t maxFileSize = 1000000;
for (size_t i = 0; i < numFiles; i++) {
auto file = senderDir / std::to_string(i);
size_t fileSz = folly::Random::rand32() % maxFileSize;
createFile(file.c_str(), fileSz);
fileInfo.push_back({std::to_string(i), -1, false});
cumulativeSize.push_back(cumulativeSize.back() + fileSz);
}
class SenderTests {
public:
static void runFileInfoGenTest(bool failReceiver) {
auto senderDir = createTmpDir();
auto receiverDir = createTmpDir();

// create 10 files
std::vector<WdtFileInfo> fileInfo;
std::vector<size_t> cumulativeSize;
cumulativeSize.push_back(0);
const size_t numFiles = 3;
const uint32_t maxFileSize = 1000000;
for (size_t i = 0; i < numFiles; i++) {
auto file = senderDir / std::to_string(i);
size_t fileSz = folly::Random::rand32() % maxFileSize;
createFile(file.c_str(), fileSz);
fileInfo.push_back({std::to_string(i), -1, false});
cumulativeSize.push_back(cumulativeSize.back() + fileSz);
}

auto receiver = std::make_unique<Receiver>(0, 3, receiverDir.c_str());
auto req = receiver->init();
if (!failReceiver) {
receiver->transferAsync();
} else {
// this will call abort
receiver.reset();
}

auto receiver = std::make_unique<Receiver>(0, 3, receiverDir.c_str());
auto req = receiver->init();
receiver->transferAsync();

std::unique_ptr<Sender> sender;
size_t nextFile = 0;
req.disableDirectoryTraversal = true;
req.directory = senderDir.c_str();
req.fileInfo = std::vector<WdtFileInfo>{fileInfo[nextFile++]};
req.fileInfoGenerator = [&]() -> folly::Optional<std::vector<WdtFileInfo>> {
auto stats = sender->getGlobalTransferStats();
EXPECT_EQ(cumulativeSize[nextFile], stats.getDataBytes());
if (nextFile < fileInfo.size()) {
return std::vector<WdtFileInfo>{fileInfo[nextFile++]};
std::unique_ptr<Sender> sender;
size_t nextFile = 0;
req.disableDirectoryTraversal = true;
req.directory = senderDir.c_str();
req.fileInfo = std::vector<WdtFileInfo>{fileInfo[nextFile++]};
req.fileInfoGenerator = [&]() -> folly::Optional<std::vector<WdtFileInfo>> {
auto stats = sender->getGlobalTransferStats();
EXPECT_EQ(cumulativeSize[nextFile], stats.getDataBytes());
if (nextFile < fileInfo.size()) {
return std::vector<WdtFileInfo>{fileInfo[nextFile++]};
}
return folly::none;
};
sender = std::make_unique<Sender>(req);
sender->transfer();

if (!failReceiver) {
receiver->finish();
for (size_t i = 0; i < numFiles; i++) {
auto sentPath = senderDir / std::to_string(i);
std::string sent;
EXPECT_TRUE(folly::readFile(sentPath.c_str(), sent));

auto recvPath = receiverDir / std::to_string(i);
std::string recv;
EXPECT_TRUE(folly::readFile(recvPath.c_str(), recv));

EXPECT_EQ(sent, recv);
}
}
return folly::none;
};
sender = std::make_unique<Sender>(req);
sender->transfer();
receiver->finish();

for (size_t i = 0; i < numFiles; i++) {
auto sentPath = senderDir / std::to_string(i);
std::string sent;
EXPECT_TRUE(folly::readFile(sentPath.c_str(), sent));

auto recvPath = receiverDir / std::to_string(i);
std::string recv;
EXPECT_TRUE(folly::readFile(recvPath.c_str(), recv));

EXPECT_EQ(sent, recv);

boost::filesystem::remove_all(senderDir);
boost::filesystem::remove_all(receiverDir);
}
};

TEST(SenderTest, FileInfoGenerator) {
SenderTests::runFileInfoGenTest(false);
}

boost::filesystem::remove_all(senderDir);
boost::filesystem::remove_all(receiverDir);
TEST(SenderTest, FileInfoGeneratorReceiverError) {
SenderTests::runFileInfoGenTest(true);
}

} // namespace wdt
Expand Down
28 changes: 25 additions & 3 deletions util/DirectorySourceQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -722,10 +722,32 @@ std::unique_ptr<ByteSource> DirectorySourceQueue::getNextSource(
}
}

void DirectorySourceQueue::waitForPreviousTransfer() {
void DirectorySourceQueue::waitForPreviousTransfer(
std::chrono::milliseconds progressReportInterval,
std::function<int64_t()> numActiveThreadsFn) {

// don't call into numActiveThreadsFn with lock held
int64_t numActiveThreads = numActiveThreadsFn();

std::unique_lock<std::mutex> lock(mutex_);
while (!sourceQueue_.empty() || numWaiters_ < numClientThreads_) {
conditionPrevTransfer_.wait(lock);
while (!sourceQueue_.empty() || numWaiters_ < numActiveThreads) {
WLOG(INFO) << "Waiting for previous transfer."
<< "; Queue Size: " << sourceQueue_.size()
<< "; Active threads: " << numActiveThreads
<< "; Num Waiters: " << numWaiters_;
conditionPrevTransfer_.wait_for(lock, progressReportInterval);

// Release lock while calling into abort checker or numActiveThreadsFn
lock.unlock();

if (threadCtx_->getAbortChecker()->shouldAbort()) {
WLOG(INFO) << "Aborting directory thread...";
break;
}
numActiveThreads = numActiveThreadsFn();

// re-acquire lock
lock.lock();
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions util/DirectorySourceQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,16 @@ class DirectorySourceQueue : public SourceQueue {
/**
* Allows the caller to block until all the previous transfers have
* finished, before invoking fileInfoGenerator_ to get the next batch.
* NOTE: This uses numClientThreads_ to get the number of clients pulling
* NOTE: This uses numActiveThreadsFn() to get the number of clients pulling
* from the queue and the size of queue to determine if transfers have
* finished.
*
* @param progressReportInterval report progress every
* progressReportInterval milliseconds.
* @param numActiveThreadsFn Func to get number of active threads
*/
void waitForPreviousTransfer();
void waitForPreviousTransfer(std::chrono::milliseconds progressReportInterval,
std::function<int64_t()> numActiveThreadsFn);

private:
/**
Expand Down
11 changes: 11 additions & 0 deletions util/ThreadsController.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,5 +267,16 @@ void ThreadsController::setNumFunnels(int numFunnels) {
funnelExecutors_.push_back(make_shared<Funnel>());
}
}

int ThreadsController::numRunningThreads() {
GuardLock lock(controllerMutex_);
int ret = 0;
for (auto& p : threadStateMap_) {
if (p.second == RUNNING) {
ret++;
}
}
return ret;
}
}
}
3 changes: 3 additions & 0 deletions util/ThreadsController.h
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,9 @@ class ThreadsController {
/// Get the nunber of registered threads
int getTotalThreads();

/// Get the number of threads with status RUNNING
int numRunningThreads();

/// Reset the thread controller so that same instance can be used again
void reset();

Expand Down

0 comments on commit 8a0273c

Please sign in to comment.