Skip to content

Commit

Permalink
manual code mod to ensure all logs are prefixed correctly
Browse files Browse the repository at this point in the history
Summary:
added WPLOG and WTPLOG macros - renamed WTVLOG for consistency
checked:
hg grep "LOG(" |egrep -v "W.*LOG\("|less
such as only bench/ and tests are using naked *LOG without W prefix

Reviewed By: jjleng

Differential Revision: D4370903

fbshipit-source-id: 33e462cea4520f89ded5969c0869c32d9a7c47fb
  • Loading branch information
ldemailly authored and facebook-github-bot committed Dec 28, 2016
1 parent 9338ad5 commit f658b66
Show file tree
Hide file tree
Showing 18 changed files with 158 additions and 154 deletions.
4 changes: 3 additions & 1 deletion ErrorCodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ namespace wdt {
#define WDT_LOG_PREFIX "wdt>\t"
#define WLOG(X) LOG(X) << WDT_LOG_PREFIX
#define WVLOG(X) VLOG(X) << WDT_LOG_PREFIX
#define WPLOG(X) PLOG(X) << WDT_LOG_PREFIX
#define WLOG_IF(X, Y) LOG_IF(X, Y) << WDT_LOG_PREFIX
#define WVLOG_IF(X, Y) VLOG_IF(X, Y) << WDT_LOG_PREFIX
#define WTLOG(X) WLOG(X) << *this << " "
#define WVTLOG(X) WVLOG(X) << *this << " "
#define WTVLOG(X) WVLOG(X) << *this << " "
#define WTPLOG(X) WPLOG(X) << *this << " "

// For now just does regular check, for some library embedding may consider
// skipping or being DCHECK
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ doesn't depend on that

gtest (google testing) but only for tests

glog (google logging library)
glog (google logging library) - use W*LOG macros so everything logged by WDT
is always prefixed by "wdt>" which helps when embedded in another service

Parts of Facebook's Folly open source library (as set in the CMakefile)
Mostly conv, threadlocal and checksum support.
Expand Down
2 changes: 1 addition & 1 deletion Receiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ const WdtTransferRequest &Receiver::init() {
if (ret == 0) {
transferRequest_.hostName.assign(hostName);
} else {
PLOG(ERROR) << "Couldn't find the host name";
WPLOG(ERROR) << "Couldn't find the local host name";
code = ERROR;
}
}
Expand Down
30 changes: 15 additions & 15 deletions ReceiverThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ int64_t readAtLeast(ServerSocket &s, char *buf, int64_t max, int64_t atLeast,
// read is false
int64_t n = s.read(buf + len, max - len, false);
if (n < 0) {
PLOG(ERROR) << "Read error on " << s.getPort() << " after " << count;
WPLOG(ERROR) << "Read error on " << s.getPort() << " after " << count;
if (len) {
return len;
} else {
Expand All @@ -68,7 +68,7 @@ int64_t readAtMost(ServerSocket &s, char *buf, int64_t max, int64_t atMost) {
// read is false
int64_t n = s.read(buf, target, false);
if (n < 0) {
PLOG(ERROR) << "Read error on " << s.getPort() << " with target " << target;
WPLOG(ERROR) << "Read error on " << s.getPort() << " target " << target;
return n;
}
if (n == 0) {
Expand Down Expand Up @@ -107,7 +107,7 @@ ReceiverThread::ReceiverThread(Receiver *wdtParent, int threadIndex,

/**LISTEN STATE***/
ReceiverState ReceiverThread::listen() {
WVTLOG(1) << "entered LISTEN state";
WTVLOG(1) << "entered LISTEN state";
const bool doActualWrites = !options_.skip_writes;
int32_t port = socket_->getPort();
WVLOG(1) << "Server Thread for port " << port << " with backlog "
Expand Down Expand Up @@ -137,7 +137,7 @@ ReceiverState ReceiverThread::listen() {

/***ACCEPT_FIRST_CONNECTION***/
ReceiverState ReceiverThread::acceptFirstConnection() {
WVTLOG(1) << "entered ACCEPT_FIRST_CONNECTION state";
WTVLOG(1) << "entered ACCEPT_FIRST_CONNECTION state";

reset();
socket_->closeNoCheck();
Expand All @@ -160,7 +160,7 @@ ReceiverState ReceiverThread::acceptFirstConnection() {
break;
}
case Receiver::AcceptMode::ACCEPT_FOREVER: {
WVTLOG(2) << "Receiver is configured to accept for-ever";
WTVLOG(2) << "Receiver is configured to accept for-ever";
break;
}
case Receiver::AcceptMode::STOP_ACCEPTING: {
Expand Down Expand Up @@ -256,7 +256,7 @@ ReceiverState ReceiverThread::sendLocalCheckpoint() {

/***READ_NEXT_CMD***/
ReceiverState ReceiverThread::readNextCmd() {
WVTLOG(1) << "entered READ_NEXT_CMD state";
WTVLOG(1) << "entered READ_NEXT_CMD state";
oldOffset_ = off_;
// TODO: we shouldn't have off_ here and buffer/size inside buffer.
numRead_ = readAtLeast(*socket_, buf_ + off_, bufSize_ - off_,
Expand Down Expand Up @@ -287,7 +287,7 @@ ReceiverState ReceiverThread::readNextCmd() {

/***PROCESS_SETTINGS_CMD***/
ReceiverState ReceiverThread::processSettingsCmd() {
WVTLOG(1) << "entered PROCESS_SETTINGS_CMD state";
WTVLOG(1) << "entered PROCESS_SETTINGS_CMD state";
Settings settings;
int senderProtocolVersion;

Expand Down Expand Up @@ -371,7 +371,7 @@ ReceiverState ReceiverThread::processSettingsCmd() {

/***PROCESS_FILE_CMD***/
ReceiverState ReceiverThread::processFileCmd() {
WVTLOG(1) << "entered PROCESS_FILE_CMD state";
WTVLOG(1) << "entered PROCESS_FILE_CMD state";
// following block needs to be executed for the first file cmd. There is no
// harm in executing it more than once. number of blocks equal to 0 is a good
// approximation for first file cmd. Did not want to introduce another boolean
Expand All @@ -396,7 +396,7 @@ ReceiverState ReceiverThread::processFileCmd() {
ErrorCode transferStatus = (ErrorCode)buf_[off_++];
if (transferStatus != OK) {
// TODO: use this status information to implement fail fast mode
WVTLOG(1) << "sender entered into error state "
WTVLOG(1) << "sender entered into error state "
<< errorCodeToStr(transferStatus);
}
int16_t headerLen = folly::loadUnaligned<int16_t>(buf_ + off_);
Expand Down Expand Up @@ -444,7 +444,7 @@ ReceiverState ReceiverThread::processFileCmd() {

// received a well formed file cmd, apply the pending checkpoint update
checkpointIndex_ = pendingCheckpointIndex_;
WVTLOG(1) << "Read id:" << blockDetails.fileName
WTVLOG(1) << "Read id:" << blockDetails.fileName
<< " size:" << blockDetails.dataSize << " ooff:" << oldOffset_
<< " off_: " << off_ << " numRead_: " << numRead_;
auto &fileCreator = wdtParent_->getFileCreator();
Expand Down Expand Up @@ -645,7 +645,7 @@ void ReceiverThread::markReceivedBlocksVerified() {
}

ReceiverState ReceiverThread::processDoneCmd() {
WVTLOG(1) << "entered PROCESS_DONE_CMD state";
WTVLOG(1) << "entered PROCESS_DONE_CMD state";
if (numRead_ != Protocol::kMinBufLength) {
WTLOG(ERROR) << "Unexpected state for done command"
<< " off_: " << off_ << " numRead_: " << numRead_;
Expand Down Expand Up @@ -674,7 +674,7 @@ ReceiverState ReceiverThread::processDoneCmd() {
}

ReceiverState ReceiverThread::processSizeCmd() {
WVTLOG(1) << "entered PROCESS_SIZE_CMD state";
WTVLOG(1) << "entered PROCESS_SIZE_CMD state";
int64_t totalSenderBytes;
bool success = Protocol::decodeSize(
buf_, off_, oldOffset_ + Protocol::kMaxSize, totalSenderBytes);
Expand Down Expand Up @@ -832,10 +832,10 @@ ReceiverState ReceiverThread::sendAbortCmd() {
}

ReceiverState ReceiverThread::sendDoneCmd() {
WVTLOG(1) << "entered SEND_DONE_CMD state";
WTVLOG(1) << "entered SEND_DONE_CMD state";
buf_[0] = Protocol::DONE_CMD;
if (socket_->write(buf_, 1) != 1) {
PLOG(ERROR) << *this << " unable to send DONE " << threadIndex_;
WTPLOG(ERROR) << "unable to send DONE " << threadIndex_;
threadStats_.setLocalErrorCode(SOCKET_WRITE_ERROR);
return ACCEPT_WITH_TIMEOUT;
}
Expand Down Expand Up @@ -928,7 +928,7 @@ ReceiverState ReceiverThread::waitForFinishOrNewCheckpoint() {
// send WAIT cmd to keep sender thread alive
buf_[0] = Protocol::WAIT_CMD;
if (socket_->write(buf_, 1) != 1) {
PLOG(ERROR) << *this << " unable to write WAIT ";
WTPLOG(ERROR) << "unable to write WAIT";
threadStats_.setLocalErrorCode(SOCKET_WRITE_ERROR);
controller_->markState(threadIndex_, RUNNING);
return ACCEPT_WITH_TIMEOUT;
Expand Down
44 changes: 22 additions & 22 deletions SenderThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ std::unique_ptr<ClientSocket> SenderThread::connectToReceiver(
}
if (i != maxRetries) {
// sleep between attempts but not after the last
WVTLOG(1) << "Sleeping after failed attempt " << i;
WTVLOG(1) << "Sleeping after failed attempt " << i;
/* sleep override */ usleep(retryInterval * 1000);
}
}
Expand All @@ -89,7 +89,7 @@ std::unique_ptr<ClientSocket> SenderThread::connectToReceiver(
}

SenderState SenderThread::connect() {
WVTLOG(1) << "entered CONNECT state";
WTVLOG(1) << "entered CONNECT state";
if (socket_) {
ErrorCode socketErrCode = socket_->getNonRetryableErrCode();
if (socketErrCode != OK) {
Expand Down Expand Up @@ -168,7 +168,7 @@ SenderState SenderThread::readLocalCheckPoint() {
}
const Checkpoint &checkpoint = checkpoints[0];
auto numBlocks = checkpoint.numBlocks;
WVTLOG(1) << "received local checkpoint " << checkpoint;
WTVLOG(1) << "received local checkpoint " << checkpoint;

if (numBlocks == -1) {
// Receiver failed while sending DONE cmd
Expand All @@ -189,7 +189,7 @@ SenderState SenderThread::readLocalCheckPoint() {
}

SenderState SenderThread::sendSettings() {
WVTLOG(1) << "entered SEND_SETTINGS state";
WTVLOG(1) << "entered SEND_SETTINGS state";
int64_t readTimeoutMillis = options_.read_timeout_millis;
int64_t writeTimeoutMillis = options_.write_timeout_millis;
int64_t off = 0;
Expand All @@ -216,7 +216,7 @@ SenderState SenderThread::sendSettings() {
}

SenderState SenderThread::sendBlocks() {
WVTLOG(1) << "entered SEND_BLOCKS state";
WTVLOG(1) << "entered SEND_BLOCKS state";
ThreadTransferHistory &transferHistory = getTransferHistory();
if (threadProtocolVersion_ >= Protocol::RECEIVER_PROGRESS_REPORT_VERSION &&
!totalSizeSent_ && dirQueue_->fileDiscoveryFinished()) {
Expand Down Expand Up @@ -272,10 +272,10 @@ TransferStats SenderThread::sendOneByteSource(
folly::storeUnaligned<int16_t>(headerLenPtr, littleEndianOff);
int64_t written = socket_->write(headerBuf, off);
if (written != off) {
PLOG(ERROR) << "Write error/mismatch " << written << " " << off
<< ". fd = " << socket_->getFd()
<< ". file = " << metadata.relPath
<< ". port = " << socket_->getPort();
WTPLOG(ERROR) << "Write error/mismatch " << written << " " << off
<< ". fd = " << socket_->getFd()
<< ". file = " << metadata.relPath
<< ". port = " << socket_->getPort();
stats.setLocalErrorCode(SOCKET_WRITE_ERROR);
stats.incrFailedAttempts();
return stats;
Expand All @@ -284,7 +284,7 @@ TransferStats SenderThread::sendOneByteSource(
int64_t byteSourceHeaderBytes = written;
int64_t throttlerInstanceBytes = byteSourceHeaderBytes;
int64_t totalThrottlerBytes = 0;
WVTLOG(3) << "Sent " << written << " on " << socket_->getFd() << " : "
WTVLOG(3) << "Sent " << written << " on " << socket_->getFd() << " : "
<< folly::humanify(std::string(headerBuf, off));
int32_t checksum = 0;
while (!source->finished()) {
Expand Down Expand Up @@ -341,7 +341,7 @@ TransferStats SenderThread::sendOneByteSource(
<< " " << actualSize;
struct stat fileStat;
if (stat(metadata.fullPath.c_str(), &fileStat) != 0) {
PLOG(ERROR) << "stat failed on path " << metadata.fullPath;
WTPLOG(ERROR) << "stat failed on path " << metadata.fullPath;
} else {
WTLOG(WARNING) << "file " << source->getIdentifier() << " previous size "
<< metadata.size << " current size " << fileStat.st_size;
Expand Down Expand Up @@ -379,7 +379,7 @@ TransferStats SenderThread::sendOneByteSource(
}

SenderState SenderThread::sendSizeCmd() {
WVTLOG(1) << "entered SEND_SIZE_CMD state";
WTVLOG(1) << "entered SEND_SIZE_CMD state";
int64_t off = 0;
buf_[off++] = Protocol::SIZE_CMD;

Expand All @@ -397,7 +397,7 @@ SenderState SenderThread::sendSizeCmd() {
}

SenderState SenderThread::sendDoneCmd() {
WVTLOG(1) << "entered SEND_DONE_CMD state";
WTVLOG(1) << "entered SEND_DONE_CMD state";

int64_t off = 0;
buf_[off++] = Protocol::DONE_CMD;
Expand All @@ -415,7 +415,7 @@ SenderState SenderThread::sendDoneCmd() {
return CHECK_FOR_ABORT;
}
threadStats_.addHeaderBytes(toWrite);
WVTLOG(1) << "Wrote done cmd on " << socket_->getFd()
WTVLOG(1) << "Wrote done cmd on " << socket_->getFd()
<< " waiting for reply...";
return READ_RECEIVER_CMD;
}
Expand All @@ -424,12 +424,12 @@ SenderState SenderThread::checkForAbort() {
WTLOG(INFO) << "entered CHECK_FOR_ABORT state";
auto numRead = socket_->read(buf_, 1);
if (numRead != 1) {
WVTLOG(1) << "No abort cmd found";
WTVLOG(1) << "No abort cmd found";
return CONNECT;
}
Protocol::CMD_MAGIC cmd = (Protocol::CMD_MAGIC)buf_[0];
if (cmd != Protocol::ABORT_CMD) {
WVTLOG(1) << "Unexpected result found while reading for abort " << buf_[0];
WTVLOG(1) << "Unexpected result found while reading for abort " << buf_[0];
return CONNECT;
}
threadStats_.addHeaderBytes(1);
Expand Down Expand Up @@ -568,7 +568,7 @@ ErrorCode SenderThread::readNextReceiverCmd() {
return ABORT;
}
if (numRead == 0) {
PLOG(ERROR) << "Got unexpected EOF, reconnecting";
WTPLOG(ERROR) << "Got unexpected EOF, reconnecting";
return SOCKET_READ_ERROR;
}
WDT_CHECK_LT(numRead, 0);
Expand Down Expand Up @@ -616,7 +616,7 @@ ErrorCode SenderThread::readNextReceiverCmd() {
}

SenderState SenderThread::readReceiverCmd() {
WVTLOG(1) << "entered READ_RECEIVER_CMD state";
WTVLOG(1) << "entered READ_RECEIVER_CMD state";

ErrorCode errCode = readNextReceiverCmd();
if (errCode != OK) {
Expand Down Expand Up @@ -687,7 +687,7 @@ ErrorCode SenderThread::readAndVerifySpuriousCheckpoint() {
}

SenderState SenderThread::processDoneCmd() {
WVTLOG(1) << "entered PROCESS_DONE_CMD state";
WTVLOG(1) << "entered PROCESS_DONE_CMD state";
// DONE cmd implies that all the blocks sent till now is acked
ThreadTransferHistory &transferHistory = getTransferHistory();
transferHistory.markAllAcknowledged();
Expand All @@ -704,7 +704,7 @@ SenderState SenderThread::processDoneCmd() {
threadStats_.setLocalErrorCode(retCode);
return CONNECT;
}
WVTLOG(1) << "done with transfer, port " << port_;
WTVLOG(1) << "done with transfer, port " << port_;
return END;
}

Expand All @@ -713,7 +713,7 @@ SenderState SenderThread::processWaitCmd() {
// similar to DONE, WAIT also verifies all the blocks
ThreadTransferHistory &transferHistory = getTransferHistory();
transferHistory.markAllAcknowledged();
WVTLOG(1) << "received WAIT_CMD, port " << port_;
WTVLOG(1) << "received WAIT_CMD, port " << port_;
return READ_RECEIVER_CMD;
}

Expand Down Expand Up @@ -812,7 +812,7 @@ SenderState SenderThread::processVersionMismatch() {
// have been collected
auto barrier = controller_->getBarrier(VERSION_MISMATCH_BARRIER);
barrier->execute();
WVTLOG(1) << "cleared the protocol version barrier";
WTVLOG(1) << "cleared the protocol version barrier";
auto execFunnel = controller_->getFunnel(VERSION_MISMATCH_FUNNEL);
while (true) {
auto status = execFunnel->getStatus();
Expand Down
2 changes: 1 addition & 1 deletion test/NetworkErrorSimulator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ void simulateNetworkError() {
int fd = 3 + rand32() % (2 * options.num_ports + 1);
// close the chosen socket
if (shutdown(fd, SHUT_WR) < 0) {
PLOG(WARNING) << "socket shutdown failed for fd " << fd;
WPLOG(WARNING) << "socket shutdown failed for fd " << fd;
} else {
WLOG(INFO) << "successfully shut down socket for fd " << fd;
}
Expand Down
2 changes: 1 addition & 1 deletion test/WdtMiscTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ TEST(BasicTest, MultiWdtSender) {
mkdir("/tmp/wdtTest", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
char baseDir[] = "/tmp/wdtTest/XXXXXX";
if (!mkdtemp(baseDir)) {
PLOG(FATAL) << "unable to make " << baseDir;
WPLOG(FATAL) << "unable to make " << baseDir;
}
LOG(INFO) << "Testing in " << baseDir;
string srcDir(baseDir);
Expand Down
Loading

0 comments on commit f658b66

Please sign in to comment.