From f658b66ee6d0cfbb590f6aec89229fdb2968c113 Mon Sep 17 00:00:00 2001 From: Laurent Demailly Date: Wed, 28 Dec 2016 14:02:22 -0800 Subject: [PATCH] manual code mod to ensure all logs are prefixed correctly Summary: added WPLOG and WTPLOG macros - renamed WTVLOG for consistency checked: hg grep "LOG(" |egrep -v "W.*LOG\("|less such as only bench/ and tests are using naked *LOG without W prefix Reviewed By: jjleng Differential Revision: D4370903 fbshipit-source-id: 33e462cea4520f89ded5969c0869c32d9a7c47fb --- ErrorCodes.h | 4 ++- README.md | 3 +- Receiver.cpp | 2 +- ReceiverThread.cpp | 30 ++++++++-------- SenderThread.cpp | 44 ++++++++++++------------ test/NetworkErrorSimulator.cpp | 2 +- test/WdtMiscTests.cpp | 2 +- util/ClientSocket.cpp | 20 +++++------ util/DirectorySourceQueue.cpp | 14 ++++---- util/FileByteSource.cpp | 18 +++++----- util/FileCreator.cpp | 18 +++++----- util/FileWriter.cpp | 30 ++++++++-------- util/SerializationUtil.cpp | 4 +-- util/SerializationUtil.h | 12 +++---- util/ServerSocket.cpp | 27 ++++++++------- util/ThreadsController.cpp | 4 +-- util/TransferLogManager.cpp | 62 +++++++++++++++++----------------- util/WdtSocket.cpp | 16 ++++----- 18 files changed, 158 insertions(+), 154 deletions(-) diff --git a/ErrorCodes.h b/ErrorCodes.h index ce1766f7..f726c640 100644 --- a/ErrorCodes.h +++ b/ErrorCodes.h @@ -19,10 +19,12 @@ namespace wdt { #define WDT_LOG_PREFIX "wdt>\t" #define WLOG(X) LOG(X) << WDT_LOG_PREFIX #define WVLOG(X) VLOG(X) << WDT_LOG_PREFIX +#define WPLOG(X) PLOG(X) << WDT_LOG_PREFIX #define WLOG_IF(X, Y) LOG_IF(X, Y) << WDT_LOG_PREFIX #define WVLOG_IF(X, Y) VLOG_IF(X, Y) << WDT_LOG_PREFIX #define WTLOG(X) WLOG(X) << *this << " " -#define WVTLOG(X) WVLOG(X) << *this << " " +#define WTVLOG(X) WVLOG(X) << *this << " " +#define WTPLOG(X) WPLOG(X) << *this << " " // For now just does regular check, for some library embedding may consider // skipping or being DCHECK diff --git a/README.md b/README.md index f4e2260e..adbb15c9 100644 --- a/README.md +++ b/README.md @@ -87,7 +87,8 @@ doesn't depend on that gtest (google testing) but only for tests -glog (google logging library) +glog (google logging library) - use W*LOG macros so everything logged by WDT +is always prefixed by "wdt>" which helps when embedded in another service Parts of Facebook's Folly open source library (as set in the CMakefile) Mostly conv, threadlocal and checksum support. diff --git a/Receiver.cpp b/Receiver.cpp index 84415930..d75bc3ba 100644 --- a/Receiver.cpp +++ b/Receiver.cpp @@ -232,7 +232,7 @@ const WdtTransferRequest &Receiver::init() { if (ret == 0) { transferRequest_.hostName.assign(hostName); } else { - PLOG(ERROR) << "Couldn't find the host name"; + WPLOG(ERROR) << "Couldn't find the local host name"; code = ERROR; } } diff --git a/ReceiverThread.cpp b/ReceiverThread.cpp index 07bac4cc..5fa9a054 100644 --- a/ReceiverThread.cpp +++ b/ReceiverThread.cpp @@ -41,7 +41,7 @@ int64_t readAtLeast(ServerSocket &s, char *buf, int64_t max, int64_t atLeast, // read is false int64_t n = s.read(buf + len, max - len, false); if (n < 0) { - PLOG(ERROR) << "Read error on " << s.getPort() << " after " << count; + WPLOG(ERROR) << "Read error on " << s.getPort() << " after " << count; if (len) { return len; } else { @@ -68,7 +68,7 @@ int64_t readAtMost(ServerSocket &s, char *buf, int64_t max, int64_t atMost) { // read is false int64_t n = s.read(buf, target, false); if (n < 0) { - PLOG(ERROR) << "Read error on " << s.getPort() << " with target " << target; + WPLOG(ERROR) << "Read error on " << s.getPort() << " target " << target; return n; } if (n == 0) { @@ -107,7 +107,7 @@ ReceiverThread::ReceiverThread(Receiver *wdtParent, int threadIndex, /**LISTEN STATE***/ ReceiverState ReceiverThread::listen() { - WVTLOG(1) << "entered LISTEN state"; + WTVLOG(1) << "entered LISTEN state"; const bool doActualWrites = !options_.skip_writes; int32_t port = socket_->getPort(); WVLOG(1) << "Server Thread for port " << port << " with backlog " @@ -137,7 +137,7 @@ ReceiverState ReceiverThread::listen() { /***ACCEPT_FIRST_CONNECTION***/ ReceiverState ReceiverThread::acceptFirstConnection() { - WVTLOG(1) << "entered ACCEPT_FIRST_CONNECTION state"; + WTVLOG(1) << "entered ACCEPT_FIRST_CONNECTION state"; reset(); socket_->closeNoCheck(); @@ -160,7 +160,7 @@ ReceiverState ReceiverThread::acceptFirstConnection() { break; } case Receiver::AcceptMode::ACCEPT_FOREVER: { - WVTLOG(2) << "Receiver is configured to accept for-ever"; + WTVLOG(2) << "Receiver is configured to accept for-ever"; break; } case Receiver::AcceptMode::STOP_ACCEPTING: { @@ -256,7 +256,7 @@ ReceiverState ReceiverThread::sendLocalCheckpoint() { /***READ_NEXT_CMD***/ ReceiverState ReceiverThread::readNextCmd() { - WVTLOG(1) << "entered READ_NEXT_CMD state"; + WTVLOG(1) << "entered READ_NEXT_CMD state"; oldOffset_ = off_; // TODO: we shouldn't have off_ here and buffer/size inside buffer. numRead_ = readAtLeast(*socket_, buf_ + off_, bufSize_ - off_, @@ -287,7 +287,7 @@ ReceiverState ReceiverThread::readNextCmd() { /***PROCESS_SETTINGS_CMD***/ ReceiverState ReceiverThread::processSettingsCmd() { - WVTLOG(1) << "entered PROCESS_SETTINGS_CMD state"; + WTVLOG(1) << "entered PROCESS_SETTINGS_CMD state"; Settings settings; int senderProtocolVersion; @@ -371,7 +371,7 @@ ReceiverState ReceiverThread::processSettingsCmd() { /***PROCESS_FILE_CMD***/ ReceiverState ReceiverThread::processFileCmd() { - WVTLOG(1) << "entered PROCESS_FILE_CMD state"; + WTVLOG(1) << "entered PROCESS_FILE_CMD state"; // following block needs to be executed for the first file cmd. There is no // harm in executing it more than once. number of blocks equal to 0 is a good // approximation for first file cmd. Did not want to introduce another boolean @@ -396,7 +396,7 @@ ReceiverState ReceiverThread::processFileCmd() { ErrorCode transferStatus = (ErrorCode)buf_[off_++]; if (transferStatus != OK) { // TODO: use this status information to implement fail fast mode - WVTLOG(1) << "sender entered into error state " + WTVLOG(1) << "sender entered into error state " << errorCodeToStr(transferStatus); } int16_t headerLen = folly::loadUnaligned(buf_ + off_); @@ -444,7 +444,7 @@ ReceiverState ReceiverThread::processFileCmd() { // received a well formed file cmd, apply the pending checkpoint update checkpointIndex_ = pendingCheckpointIndex_; - WVTLOG(1) << "Read id:" << blockDetails.fileName + WTVLOG(1) << "Read id:" << blockDetails.fileName << " size:" << blockDetails.dataSize << " ooff:" << oldOffset_ << " off_: " << off_ << " numRead_: " << numRead_; auto &fileCreator = wdtParent_->getFileCreator(); @@ -645,7 +645,7 @@ void ReceiverThread::markReceivedBlocksVerified() { } ReceiverState ReceiverThread::processDoneCmd() { - WVTLOG(1) << "entered PROCESS_DONE_CMD state"; + WTVLOG(1) << "entered PROCESS_DONE_CMD state"; if (numRead_ != Protocol::kMinBufLength) { WTLOG(ERROR) << "Unexpected state for done command" << " off_: " << off_ << " numRead_: " << numRead_; @@ -674,7 +674,7 @@ ReceiverState ReceiverThread::processDoneCmd() { } ReceiverState ReceiverThread::processSizeCmd() { - WVTLOG(1) << "entered PROCESS_SIZE_CMD state"; + WTVLOG(1) << "entered PROCESS_SIZE_CMD state"; int64_t totalSenderBytes; bool success = Protocol::decodeSize( buf_, off_, oldOffset_ + Protocol::kMaxSize, totalSenderBytes); @@ -832,10 +832,10 @@ ReceiverState ReceiverThread::sendAbortCmd() { } ReceiverState ReceiverThread::sendDoneCmd() { - WVTLOG(1) << "entered SEND_DONE_CMD state"; + WTVLOG(1) << "entered SEND_DONE_CMD state"; buf_[0] = Protocol::DONE_CMD; if (socket_->write(buf_, 1) != 1) { - PLOG(ERROR) << *this << " unable to send DONE " << threadIndex_; + WTPLOG(ERROR) << "unable to send DONE " << threadIndex_; threadStats_.setLocalErrorCode(SOCKET_WRITE_ERROR); return ACCEPT_WITH_TIMEOUT; } @@ -928,7 +928,7 @@ ReceiverState ReceiverThread::waitForFinishOrNewCheckpoint() { // send WAIT cmd to keep sender thread alive buf_[0] = Protocol::WAIT_CMD; if (socket_->write(buf_, 1) != 1) { - PLOG(ERROR) << *this << " unable to write WAIT "; + WTPLOG(ERROR) << "unable to write WAIT"; threadStats_.setLocalErrorCode(SOCKET_WRITE_ERROR); controller_->markState(threadIndex_, RUNNING); return ACCEPT_WITH_TIMEOUT; diff --git a/SenderThread.cpp b/SenderThread.cpp index 5de1c45f..c3662fbe 100644 --- a/SenderThread.cpp +++ b/SenderThread.cpp @@ -70,7 +70,7 @@ std::unique_ptr SenderThread::connectToReceiver( } if (i != maxRetries) { // sleep between attempts but not after the last - WVTLOG(1) << "Sleeping after failed attempt " << i; + WTVLOG(1) << "Sleeping after failed attempt " << i; /* sleep override */ usleep(retryInterval * 1000); } } @@ -89,7 +89,7 @@ std::unique_ptr SenderThread::connectToReceiver( } SenderState SenderThread::connect() { - WVTLOG(1) << "entered CONNECT state"; + WTVLOG(1) << "entered CONNECT state"; if (socket_) { ErrorCode socketErrCode = socket_->getNonRetryableErrCode(); if (socketErrCode != OK) { @@ -168,7 +168,7 @@ SenderState SenderThread::readLocalCheckPoint() { } const Checkpoint &checkpoint = checkpoints[0]; auto numBlocks = checkpoint.numBlocks; - WVTLOG(1) << "received local checkpoint " << checkpoint; + WTVLOG(1) << "received local checkpoint " << checkpoint; if (numBlocks == -1) { // Receiver failed while sending DONE cmd @@ -189,7 +189,7 @@ SenderState SenderThread::readLocalCheckPoint() { } SenderState SenderThread::sendSettings() { - WVTLOG(1) << "entered SEND_SETTINGS state"; + WTVLOG(1) << "entered SEND_SETTINGS state"; int64_t readTimeoutMillis = options_.read_timeout_millis; int64_t writeTimeoutMillis = options_.write_timeout_millis; int64_t off = 0; @@ -216,7 +216,7 @@ SenderState SenderThread::sendSettings() { } SenderState SenderThread::sendBlocks() { - WVTLOG(1) << "entered SEND_BLOCKS state"; + WTVLOG(1) << "entered SEND_BLOCKS state"; ThreadTransferHistory &transferHistory = getTransferHistory(); if (threadProtocolVersion_ >= Protocol::RECEIVER_PROGRESS_REPORT_VERSION && !totalSizeSent_ && dirQueue_->fileDiscoveryFinished()) { @@ -272,10 +272,10 @@ TransferStats SenderThread::sendOneByteSource( folly::storeUnaligned(headerLenPtr, littleEndianOff); int64_t written = socket_->write(headerBuf, off); if (written != off) { - PLOG(ERROR) << "Write error/mismatch " << written << " " << off - << ". fd = " << socket_->getFd() - << ". file = " << metadata.relPath - << ". port = " << socket_->getPort(); + WTPLOG(ERROR) << "Write error/mismatch " << written << " " << off + << ". fd = " << socket_->getFd() + << ". file = " << metadata.relPath + << ". port = " << socket_->getPort(); stats.setLocalErrorCode(SOCKET_WRITE_ERROR); stats.incrFailedAttempts(); return stats; @@ -284,7 +284,7 @@ TransferStats SenderThread::sendOneByteSource( int64_t byteSourceHeaderBytes = written; int64_t throttlerInstanceBytes = byteSourceHeaderBytes; int64_t totalThrottlerBytes = 0; - WVTLOG(3) << "Sent " << written << " on " << socket_->getFd() << " : " + WTVLOG(3) << "Sent " << written << " on " << socket_->getFd() << " : " << folly::humanify(std::string(headerBuf, off)); int32_t checksum = 0; while (!source->finished()) { @@ -341,7 +341,7 @@ TransferStats SenderThread::sendOneByteSource( << " " << actualSize; struct stat fileStat; if (stat(metadata.fullPath.c_str(), &fileStat) != 0) { - PLOG(ERROR) << "stat failed on path " << metadata.fullPath; + WTPLOG(ERROR) << "stat failed on path " << metadata.fullPath; } else { WTLOG(WARNING) << "file " << source->getIdentifier() << " previous size " << metadata.size << " current size " << fileStat.st_size; @@ -379,7 +379,7 @@ TransferStats SenderThread::sendOneByteSource( } SenderState SenderThread::sendSizeCmd() { - WVTLOG(1) << "entered SEND_SIZE_CMD state"; + WTVLOG(1) << "entered SEND_SIZE_CMD state"; int64_t off = 0; buf_[off++] = Protocol::SIZE_CMD; @@ -397,7 +397,7 @@ SenderState SenderThread::sendSizeCmd() { } SenderState SenderThread::sendDoneCmd() { - WVTLOG(1) << "entered SEND_DONE_CMD state"; + WTVLOG(1) << "entered SEND_DONE_CMD state"; int64_t off = 0; buf_[off++] = Protocol::DONE_CMD; @@ -415,7 +415,7 @@ SenderState SenderThread::sendDoneCmd() { return CHECK_FOR_ABORT; } threadStats_.addHeaderBytes(toWrite); - WVTLOG(1) << "Wrote done cmd on " << socket_->getFd() + WTVLOG(1) << "Wrote done cmd on " << socket_->getFd() << " waiting for reply..."; return READ_RECEIVER_CMD; } @@ -424,12 +424,12 @@ SenderState SenderThread::checkForAbort() { WTLOG(INFO) << "entered CHECK_FOR_ABORT state"; auto numRead = socket_->read(buf_, 1); if (numRead != 1) { - WVTLOG(1) << "No abort cmd found"; + WTVLOG(1) << "No abort cmd found"; return CONNECT; } Protocol::CMD_MAGIC cmd = (Protocol::CMD_MAGIC)buf_[0]; if (cmd != Protocol::ABORT_CMD) { - WVTLOG(1) << "Unexpected result found while reading for abort " << buf_[0]; + WTVLOG(1) << "Unexpected result found while reading for abort " << buf_[0]; return CONNECT; } threadStats_.addHeaderBytes(1); @@ -568,7 +568,7 @@ ErrorCode SenderThread::readNextReceiverCmd() { return ABORT; } if (numRead == 0) { - PLOG(ERROR) << "Got unexpected EOF, reconnecting"; + WTPLOG(ERROR) << "Got unexpected EOF, reconnecting"; return SOCKET_READ_ERROR; } WDT_CHECK_LT(numRead, 0); @@ -616,7 +616,7 @@ ErrorCode SenderThread::readNextReceiverCmd() { } SenderState SenderThread::readReceiverCmd() { - WVTLOG(1) << "entered READ_RECEIVER_CMD state"; + WTVLOG(1) << "entered READ_RECEIVER_CMD state"; ErrorCode errCode = readNextReceiverCmd(); if (errCode != OK) { @@ -687,7 +687,7 @@ ErrorCode SenderThread::readAndVerifySpuriousCheckpoint() { } SenderState SenderThread::processDoneCmd() { - WVTLOG(1) << "entered PROCESS_DONE_CMD state"; + WTVLOG(1) << "entered PROCESS_DONE_CMD state"; // DONE cmd implies that all the blocks sent till now is acked ThreadTransferHistory &transferHistory = getTransferHistory(); transferHistory.markAllAcknowledged(); @@ -704,7 +704,7 @@ SenderState SenderThread::processDoneCmd() { threadStats_.setLocalErrorCode(retCode); return CONNECT; } - WVTLOG(1) << "done with transfer, port " << port_; + WTVLOG(1) << "done with transfer, port " << port_; return END; } @@ -713,7 +713,7 @@ SenderState SenderThread::processWaitCmd() { // similar to DONE, WAIT also verifies all the blocks ThreadTransferHistory &transferHistory = getTransferHistory(); transferHistory.markAllAcknowledged(); - WVTLOG(1) << "received WAIT_CMD, port " << port_; + WTVLOG(1) << "received WAIT_CMD, port " << port_; return READ_RECEIVER_CMD; } @@ -812,7 +812,7 @@ SenderState SenderThread::processVersionMismatch() { // have been collected auto barrier = controller_->getBarrier(VERSION_MISMATCH_BARRIER); barrier->execute(); - WVTLOG(1) << "cleared the protocol version barrier"; + WTVLOG(1) << "cleared the protocol version barrier"; auto execFunnel = controller_->getFunnel(VERSION_MISMATCH_FUNNEL); while (true) { auto status = execFunnel->getStatus(); diff --git a/test/NetworkErrorSimulator.cpp b/test/NetworkErrorSimulator.cpp index 9cf4ba5c..048774b7 100644 --- a/test/NetworkErrorSimulator.cpp +++ b/test/NetworkErrorSimulator.cpp @@ -30,7 +30,7 @@ void simulateNetworkError() { int fd = 3 + rand32() % (2 * options.num_ports + 1); // close the chosen socket if (shutdown(fd, SHUT_WR) < 0) { - PLOG(WARNING) << "socket shutdown failed for fd " << fd; + WPLOG(WARNING) << "socket shutdown failed for fd " << fd; } else { WLOG(INFO) << "successfully shut down socket for fd " << fd; } diff --git a/test/WdtMiscTests.cpp b/test/WdtMiscTests.cpp index f3ca6c21..662bf9e9 100644 --- a/test/WdtMiscTests.cpp +++ b/test/WdtMiscTests.cpp @@ -39,7 +39,7 @@ TEST(BasicTest, MultiWdtSender) { mkdir("/tmp/wdtTest", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); char baseDir[] = "/tmp/wdtTest/XXXXXX"; if (!mkdtemp(baseDir)) { - PLOG(FATAL) << "unable to make " << baseDir; + WPLOG(FATAL) << "unable to make " << baseDir; } LOG(INFO) << "Testing in " << baseDir; string srcDir(baseDir); diff --git a/util/ClientSocket.cpp b/util/ClientSocket.cpp index 716a376b..72766819 100644 --- a/util/ClientSocket.cpp +++ b/util/ClientSocket.cpp @@ -49,7 +49,7 @@ ErrorCode ClientSocket::connect() { string portStr = folly::to(port_); int res = getaddrinfo(dest_.c_str(), portStr.c_str(), &sa_, &infoList); if (res) { - // not errno, can't use PLOG (perror) + // not errno, can't use WPLOG (perror) WLOG(ERROR) << "Failed getaddrinfo " << dest_ << " , " << port_ << " : " << res << " : " << gai_strerror(res); return CONN_ERROR; @@ -63,7 +63,7 @@ ErrorCode ClientSocket::connect() { WVLOG(2) << "will connect to " << host << " " << port; fd_ = socket(info->ai_family, info->ai_socktype, info->ai_protocol); if (fd_ == -1) { - PLOG(WARNING) << "Error making socket for port " << port_; + WPLOG(WARNING) << "Error making socket for port " << port_; continue; } WVLOG(1) << "new socket " << fd_ << " for port " << port_; @@ -75,15 +75,15 @@ ErrorCode ClientSocket::connect() { sockArg |= O_NONBLOCK; res = fcntl(fd_, F_SETFL, sockArg); if (res < 0) { - PLOG(ERROR) << "Failed to make the socket non-blocking " << port_ - << " sock " << sockArg << " res " << res; + WPLOG(ERROR) << "Failed to make the socket non-blocking " << port_ + << " sock " << sockArg << " res " << res; closeConnection(); continue; } if (::connect(fd_, info->ai_addr, info->ai_addrlen) != 0) { if (errno != EINPROGRESS) { - PLOG(INFO) << "Error connecting on " << host << " " << port; + WPLOG(INFO) << "Error connecting on " << host << " " << port; closeConnection(); continue; } @@ -123,7 +123,7 @@ ErrorCode ClientSocket::connect() { WVLOG(1) << "poll() timed out " << host << " " << port; continue; } - PLOG(ERROR) << "poll() failed " << host << " " << port << " " << fd_; + WPLOG(ERROR) << "poll() failed " << host << " " << port << " " << fd_; closeConnection(); return CONN_ERROR; } @@ -134,7 +134,7 @@ ErrorCode ClientSocket::connect() { int connectResult; socklen_t len = sizeof(connectResult); if (getsockopt(fd_, SOL_SOCKET, SO_ERROR, &connectResult, &len) < 0) { - PLOG(WARNING) << "getsockopt() failed"; + WPLOG(WARNING) << "getsockopt() failed"; closeConnection(); continue; } @@ -151,7 +151,7 @@ ErrorCode ClientSocket::connect() { sockArg &= (~O_NONBLOCK); res = fcntl(fd_, F_SETFL, sockArg); if (res == -1) { - PLOG(ERROR) << "Could not make the socket blocking " << port_; + WPLOG(ERROR) << "Could not make the socket blocking " << port_; closeConnection(); continue; } @@ -187,8 +187,8 @@ void ClientSocket::setSendBufferSize() { int status = ::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &bufSize, sizeof(bufSize)); if (status != 0) { - PLOG(ERROR) << "Failed to set send buffer " << port_ << " size " << bufSize - << " fd " << fd_; + WPLOG(ERROR) << "Failed to set send buffer " << port_ << " size " << bufSize + << " fd " << fd_; return; } WVLOG(1) << "Send buffer size set to " << bufSize << " port " << port_; diff --git a/util/DirectorySourceQueue.cpp b/util/DirectorySourceQueue.cpp index ea2816be..420d9588 100644 --- a/util/DirectorySourceQueue.cpp +++ b/util/DirectorySourceQueue.cpp @@ -170,7 +170,7 @@ DirectorySourceQueue::~DirectorySourceQueue() { if (fileData->needToClose && fileData->fd >= 0) { int ret = ::close(fileData->fd); if (ret) { - PLOG(ERROR) << "Failed to close file " << fileData->fullPath; + WPLOG(ERROR) << "Failed to close file " << fileData->fullPath; } } delete fileData; @@ -227,7 +227,7 @@ string DirectorySourceQueue::resolvePath(const string &path) { string result; char *resolvedPath = realpath(path.c_str(), nullptr); if (!resolvedPath) { - PLOG(ERROR) << "Couldn't resolve " << path; + WPLOG(ERROR) << "Couldn't resolve " << path; return result; // empty string == error } result.assign(resolvedPath); @@ -262,7 +262,7 @@ bool DirectorySourceQueue::explore() { WVLOG(1) << "Processing directory " << fullPath; DIR *dirPtr = opendir(fullPath.c_str()); if (!dirPtr) { - PLOG(ERROR) << "Error opening dir " << fullPath; + WPLOG(ERROR) << "Error opening dir " << fullPath; failedDirectories_.emplace_back(fullPath); hasError = true; continue; @@ -279,7 +279,7 @@ bool DirectorySourceQueue::explore() { dirEntryRes = readdir(dirPtr); if (!dirEntryRes) { if (errno) { - PLOG(ERROR) << "Error reading dir " << fullPath; + WPLOG(ERROR) << "Error reading dir " << fullPath; // closedir always called hasError = true; } else { @@ -321,7 +321,7 @@ bool DirectorySourceQueue::explore() { // On XFS we don't know yet if this is a symlink, so check // if following symlinks is ok we will do stat() too if (lstat(newFullPath.c_str(), &fileStat) != 0) { - PLOG(ERROR) << "lstat() failed on path " << newFullPath; + WPLOG(ERROR) << "lstat() failed on path " << newFullPath; hasError = true; continue; } @@ -331,7 +331,7 @@ bool DirectorySourceQueue::explore() { // Use stat to see if the pointed file is of the right type // (overrides previous stat call result) if (stat(newFullPath.c_str(), &fileStat) != 0) { - PLOG(ERROR) << "stat() failed on path " << newFullPath; + WPLOG(ERROR) << "stat() failed on path " << newFullPath; hasError = true; continue; } @@ -560,7 +560,7 @@ bool DirectorySourceQueue::enqueueFiles() { if (info.fileSize < 0) { struct stat fileStat; if (stat(fullPath.c_str(), &fileStat) != 0) { - PLOG(ERROR) << "stat failed on path " << fullPath; + WPLOG(ERROR) << "stat failed on path " << fullPath; return false; } info.fileSize = fileStat.st_size; diff --git a/util/FileByteSource.cpp b/util/FileByteSource.cpp index 4ce5ac16..98660de8 100644 --- a/util/FileByteSource.cpp +++ b/util/FileByteSource.cpp @@ -38,7 +38,7 @@ int FileUtil::openForRead(ThreadCtx &threadCtx, const std::string &filename, << "for " << filename; int ret = fcntl(fd, F_NOCACHE, 1); if (ret) { - PLOG(ERROR) << "Not able to set F_NOCACHE"; + WPLOG(ERROR) << "Not able to set F_NOCACHE"; } #else WDT_CHECK(false) @@ -48,7 +48,7 @@ int FileUtil::openForRead(ThreadCtx &threadCtx, const std::string &filename, #endif } } else { - PLOG(ERROR) << "Error opening file " << filename; + WPLOG(ERROR) << "Error opening file " << filename; } return fd; } @@ -124,11 +124,11 @@ char *FileByteSource::read(int64_t &size) { numRead = ::pread(fd_, buffer->getData(), physicalRead, seekPos); } if (numRead < 0) { - PLOG(ERROR) << "Failure while reading file " << metadata_->fullPath - << " need align " << alignedReadNeeded_ << " physicalRead " - << physicalRead << " offset " << offset_ << " seepPos " - << seekPos << " offsetRemainder " << offsetRemainder - << " bytesRead " << bytesRead_; + WPLOG(ERROR) << "Failure while reading file " << metadata_->fullPath + << " need align " << alignedReadNeeded_ << " physicalRead " + << physicalRead << " offset " << offset_ << " seepPos " + << seekPos << " offsetRemainder " << offsetRemainder + << " bytesRead " << bytesRead_; this->close(); transferStats_.setLocalErrorCode(BYTE_SOURCE_READ_ERROR); return nullptr; @@ -173,8 +173,8 @@ void FileByteSource::clearPageCache() { if (bytesRead_ > 0 && !options.skip_fadvise) { PerfStatCollector statCollector(*threadCtx_, PerfStatReport::FADVISE); if (posix_fadvise(fd_, offset_, bytesRead_, POSIX_FADV_DONTNEED) != 0) { - PLOG(ERROR) << "posix_fadvise failed for " << getIdentifier() << " " - << offset_ << " " << bytesRead_; + WPLOG(ERROR) << "posix_fadvise failed for " << getIdentifier() << " " + << offset_ << " " << bytesRead_; } } #endif diff --git a/util/FileCreator.cpp b/util/FileCreator.cpp index 4f89302c..11a43c90 100644 --- a/util/FileCreator.cpp +++ b/util/FileCreator.cpp @@ -21,7 +21,7 @@ namespace wdt { bool FileCreator::setFileSize(ThreadCtx &threadCtx, int fd, int64_t fileSize) { struct stat fileStat; if (fstat(fd, &fileStat) != 0) { - PLOG(ERROR) << "fstat() failed for " << fd; + WPLOG(ERROR) << "fstat() failed for " << fd; return false; } if (fileStat.st_size > fileSize) { @@ -29,7 +29,7 @@ bool FileCreator::setFileSize(ThreadCtx &threadCtx, int fd, int64_t fileSize) { int64_t sizeToTruncate = (threadCtx.getOptions().shouldPreallocateFiles() ? fileSize : 0); if (ftruncate(fd, sizeToTruncate) != 0) { - PLOG(ERROR) << "ftruncate() failed for " << fd << " " << sizeToTruncate; + WPLOG(ERROR) << "ftruncate() failed for " << fd << " " << sizeToTruncate; return false; } } @@ -134,7 +134,7 @@ int FileCreator::openForBlocks(ThreadCtx &threadCtx, status = ::unlink(path.c_str()); } if (status != 0) { - PLOG(ERROR) << "Failed to delete file " << path; + WPLOG(ERROR) << "Failed to delete file " << path; } else { WLOG(INFO) << "Successfully deleted file " << path; } @@ -190,7 +190,7 @@ int FileCreator::openExistingFile(ThreadCtx &threadCtx, res = open(path.c_str(), openFlags, 0644); } if (res < 0) { - PLOG(ERROR) << "failed opening file " << path; + WPLOG(ERROR) << "failed opening file " << path; return -1; } WVLOG(1) << "successfully opened file " << path; @@ -259,11 +259,11 @@ int FileCreator::createFile(ThreadCtx &threadCtx, const string &relPathStr) { } if (res < 0) { if (dir.empty()) { - PLOG(ERROR) << "failed creating file " << path; + WPLOG(ERROR) << "failed creating file " << path; return -1; } - PLOG(ERROR) << "failed creating file " << path << ", trying to " - << "force directory creation"; + WPLOG(ERROR) << "failed creating file " << path << ", trying to " + << "force directory creation"; bool dirSuccess; { PerfStatCollector statCollector(threadCtx, @@ -279,7 +279,7 @@ int FileCreator::createFile(ThreadCtx &threadCtx, const string &relPathStr) { res = open(path.c_str(), openFlags, 0644); } if (res < 0) { - PLOG(ERROR) << "failed creating file " << path; + WPLOG(ERROR) << "failed creating file " << path; return -1; } } @@ -313,7 +313,7 @@ bool FileCreator::createDirRecursively(const std::string dir, bool force) { std::string fullDirPath = getFullPath(dir); int code = mkdir(fullDirPath.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); if (code != 0 && errno != EEXIST && errno != EISDIR) { - PLOG(ERROR) << "failed to make directory " << fullDirPath; + WPLOG(ERROR) << "failed to make directory " << fullDirPath; return false; } else if (code != 0) { WLOG(INFO) << "dir already exists " << fullDirPath; diff --git a/util/FileWriter.cpp b/util/FileWriter.cpp index 14ba04d4..68b8fb55 100644 --- a/util/FileWriter.cpp +++ b/util/FileWriter.cpp @@ -35,8 +35,8 @@ ErrorCode FileWriter::open() { ret = lseek(fd_, blockDetails_->offset, SEEK_SET); } if (ret < 0) { - PLOG(ERROR) << "Unable to seek to " << blockDetails_->offset << " for " - << blockDetails_->fileName; + WPLOG(ERROR) << "Unable to seek to " << blockDetails_->offset << " for " + << blockDetails_->fileName; close(); } } @@ -51,7 +51,7 @@ void FileWriter::close() { if (fd_ >= 0) { PerfStatCollector statCollector(threadCtx_, PerfStatReport::FILE_CLOSE); if (::close(fd_) != 0) { - PLOG(ERROR) << "Unable to close fd " << fd_; + WPLOG(ERROR) << "Unable to close fd " << fd_; } fd_ = -1; } @@ -74,9 +74,9 @@ ErrorCode FileWriter::write(char *buf, int64_t size) { << blockDetails_->fileName; continue; } - PLOG(ERROR) << "File write failed for " << blockDetails_->fileName - << "fd : " << fd_ << " " << written << " " << count << " " - << size; + WPLOG(ERROR) << "File write failed for " << blockDetails_->fileName + << "fd : " << fd_ << " " << written << " " << count << " " + << size; return FILE_WRITE_ERROR; } count += written; @@ -87,10 +87,10 @@ ErrorCode FileWriter::write(char *buf, int64_t size) { if (finished && (options.isLogBasedResumption() || options.fsync)) { PerfStatCollector statCollector(threadCtx_, PerfStatReport::FSYNC_STATS); if (fsync(fd_) != 0) { - PLOG(ERROR) << "fsync failed for " << blockDetails_->fileName - << " offset " << blockDetails_->offset << " file-size " - << blockDetails_->fileSize << " data-size " - << blockDetails_->dataSize; + WPLOG(ERROR) << "fsync failed for " << blockDetails_->fileName + << " offset " << blockDetails_->offset << " file-size " + << blockDetails_->fileSize << " data-size " + << blockDetails_->dataSize; return FILE_WRITE_ERROR; } WVLOG(1) << "File " << blockDetails_->fileName << " fsync'ed"; @@ -102,9 +102,9 @@ ErrorCode FileWriter::write(char *buf, int64_t size) { PerfStatCollector statCollector(threadCtx_, PerfStatReport::FADVISE); if (posix_fadvise(fd_, blockDetails_->offset, blockDetails_->dataSize, POSIX_FADV_DONTNEED) != 0) { - PLOG(ERROR) << "posix_fadvise failed for " << blockDetails_->fileName - << " " << blockDetails_->offset << " " - << blockDetails_->dataSize; + WPLOG(ERROR) << "posix_fadvise failed for " << blockDetails_->fileName + << " " << blockDetails_->offset << " " + << blockDetails_->dataSize; } } #endif @@ -140,8 +140,8 @@ void FileWriter::syncFileRange(int64_t written, bool forced) { SYNC_FILE_RANGE_WRITE); } if (status != 0) { - PLOG(ERROR) << "sync_file_range() failed for " << blockDetails_->fileName - << "fd " << fd_; + WPLOG(ERROR) << "sync_file_range() failed for " << blockDetails_->fileName + << "fd " << fd_; return; } WVLOG(1) << "file range [" << nextSyncOffset_ << " " diff --git a/util/SerializationUtil.cpp b/util/SerializationUtil.cpp index 95aa3a7e..a450f8dd 100644 --- a/util/SerializationUtil.cpp +++ b/util/SerializationUtil.cpp @@ -107,8 +107,8 @@ bool encodeString(char *dest, int64_t sz, int64_t &off, const string &str) { } const int64_t strLen = str.length(); if ((off + strLen) > sz) { - LOG(ERROR) << "Not enough room to encode \"" << str << "\" in buf of size " - << sz; + WLOG(ERROR) << "Not enough room to encode \"" << str << "\" in buf of size " + << sz; return false; } memcpy(dest + off, str.data(), strLen); diff --git a/util/SerializationUtil.h b/util/SerializationUtil.h index e8549403..084ae89e 100644 --- a/util/SerializationUtil.h +++ b/util/SerializationUtil.h @@ -113,7 +113,7 @@ inline bool encodeVarU64(char *data, size_t datasz, int64_t &pos, uint64_t v) { while ((++count < 9) && (v >= 128)) { #if WDT_EDI64_DO_CHECKS if (p >= end) { - LOG(WARNING) << "not enough space to store full value"; + WLOG(WARNING) << "not enough space to store full value"; return false; } #endif @@ -122,7 +122,7 @@ inline bool encodeVarU64(char *data, size_t datasz, int64_t &pos, uint64_t v) { } #if WDT_EDI64_DO_CHECKS if (p >= end) { - LOG(WARNING) << "not enough space to store full value"; + WLOG(WARNING) << "not enough space to store full value"; return false; } #endif @@ -164,7 +164,7 @@ inline bool decodeVarU64(const char *data, size_t datalen, int64_t &pos, uint64_t &res) { #if WDT_EDI64_DO_CHECKS if (pos < 0) { - LOG(WARNING) << "negative writing offset " << pos; + WLOG(WARNING) << "negative writing offset " << pos; return false; } #endif @@ -175,8 +175,8 @@ inline bool decodeVarU64(const char *data, size_t datalen, int64_t &pos, int count = 0; #if WDT_EDI64_DO_CHECKS if (p >= end) { - LOG(WARNING) << "not enough space to store full value at start, l=" - << datalen << " p=" << pos; + WLOG(WARNING) << "not enough space to store full value at start, l=" + << datalen << " p=" << pos; return false; } #endif @@ -185,7 +185,7 @@ inline bool decodeVarU64(const char *data, size_t datalen, int64_t &pos, shift += 7; #if WDT_EDI64_DO_CHECKS if (p >= end) { - LOG(WARNING) << "not enough space to store full value l=" << datalen; + WLOG(WARNING) << "not enough space to store full value l=" << datalen; return false; } #endif diff --git a/util/ServerSocket.cpp b/util/ServerSocket.cpp index c441f9e9..ef6ebb8f 100644 --- a/util/ServerSocket.cpp +++ b/util/ServerSocket.cpp @@ -33,7 +33,7 @@ void ServerSocket::closeAllNoCheck() { if (listeningFd >= 0) { int ret = ::close(listeningFd); if (ret != 0) { - PLOG(ERROR) + WPLOG(ERROR) << "Error closing listening fd for server socket. listeningFd: " << listeningFd << " port: " << port_; } @@ -53,29 +53,30 @@ int ServerSocket::listenInternal(struct addrinfo *info, int listeningFd = socket(info->ai_family, info->ai_socktype, info->ai_protocol); if (listeningFd == -1) { - PLOG(WARNING) << "Error making server socket " << host << " " << port_; + WPLOG(WARNING) << "Error making server socket " << host << " " << port_; return -1; } setReceiveBufferSize(listeningFd); int optval = 1; if (setsockopt(listeningFd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) != 0) { - PLOG(ERROR) << "Unable to set SO_REUSEADDR option " << host << " " << port_; + WPLOG(ERROR) << "Unable to set SO_REUSEADDR option " << host << " " + << port_; } if (info->ai_family == AF_INET6) { // for ipv6 address, turn on ipv6 only flag if (setsockopt(listeningFd, IPPROTO_IPV6, IPV6_V6ONLY, &optval, sizeof(optval)) != 0) { - PLOG(ERROR) << "Unable to set IPV6_V6ONLY flag " << host << " " << port_; + WPLOG(ERROR) << "Unable to set IPV6_V6ONLY flag " << host << " " << port_; } } if (bind(listeningFd, info->ai_addr, info->ai_addrlen)) { - PLOG(WARNING) << "Error binding " << host << " " << port_; + WPLOG(WARNING) << "Error binding " << host << " " << port_; ::close(listeningFd); return -1; } if (::listen(listeningFd, backlog_)) { - PLOG(ERROR) << "listen error for port " << host << " " << port_; + WPLOG(ERROR) << "listen error for port " << host << " " << port_; ::close(listeningFd); return -1; } @@ -90,7 +91,7 @@ int ServerSocket::getSelectedPortAndNewAddress(int listeningFd, struct sockaddr_in sin; socklen_t len = sizeof(sin); if (getsockname(listeningFd, (struct sockaddr *)&sin, &len) == -1) { - PLOG(ERROR) << "getsockname failed " << host; + WPLOG(ERROR) << "getsockname failed " << host; return -1; } port = ntohs(sin.sin_port); @@ -136,7 +137,7 @@ ErrorCode ServerSocket::listen() { std::string portStr = folly::to(port_); int res = getaddrinfo(nullptr, portStr.c_str(), &sa, &infoList); if (res) { - // not errno, can't use PLOG (perror) + // not errno, can't use WPLOG (perror) WLOG(ERROR) << "Failed getaddrinfo ai_passive on " << port_ << " : " << res << " : " << gai_strerror(res); return CONN_ERROR; @@ -248,8 +249,8 @@ ErrorCode ServerSocket::acceptNextConnection(int timeoutMillis, << ", listening fds : " << listeningFds_; continue; } - PLOG(ERROR) << "poll() failed on port : " << port_ - << ", listening fds : " << listeningFds_; + WPLOG(ERROR) << "poll() failed on port : " << port_ + << ", listening fds : " << listeningFds_; return CONN_ERROR; } @@ -268,7 +269,7 @@ ErrorCode ServerSocket::acceptNextConnection(int timeoutMillis, socklen_t addrLen = sizeof(addr); fd_ = accept(pollFd.fd, (struct sockaddr *)&addr, &addrLen); if (fd_ < 0) { - PLOG(ERROR) << "accept error"; + WPLOG(ERROR) << "accept error"; return CONN_ERROR; } getNameInfo((struct sockaddr *)&addr, addrLen, peerIp_, peerPort_); @@ -291,8 +292,8 @@ void ServerSocket::setReceiveBufferSize(int fd) { int status = ::setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &bufSize, sizeof(bufSize)); if (status != 0) { - PLOG(ERROR) << "Failed to set receive buffer " << port_ << " size " - << bufSize << " fd " << fd; + WPLOG(ERROR) << "Failed to set receive buffer " << port_ << " size " + << bufSize << " fd " << fd; return; } WVLOG(1) << "Receive buffer size set to " << bufSize << " port " << port_; diff --git a/util/ThreadsController.cpp b/util/ThreadsController.cpp index cc82c0a7..da1119ae 100644 --- a/util/ThreadsController.cpp +++ b/util/ThreadsController.cpp @@ -31,8 +31,8 @@ void ConditionGuardImpl::wait(int timeoutMillis, const ThreadCtx &threadCtx) { } // check for abort if (threadCtx.getAbortChecker()->shouldAbort()) { - LOG(ERROR) << "Transfer aborted during condition guard wait " - << threadCtx.getThreadIndex(); + WLOG(ERROR) << "Transfer aborted during condition guard wait " + << threadCtx.getThreadIndex(); return; } remainingTime -= waitTime; diff --git a/util/TransferLogManager.cpp b/util/TransferLogManager.cpp index 9779cc3e..1d9c1608 100644 --- a/util/TransferLogManager.cpp +++ b/util/TransferLogManager.cpp @@ -252,15 +252,15 @@ ErrorCode TransferLogManager::openLog() { fd_ = ::open(logPath.c_str(), O_RDWR); if (fd_ < 0) { if (errno != ENOENT) { - PLOG(ERROR) << "Could not open wdt log " << logPath; + WPLOG(ERROR) << "Could not open wdt log " << logPath; return TRANSFER_LOG_ACQUIRE_ERROR; } else { // creation of the log path (which can still be a race) WLOG(INFO) << logPath << " doesn't exist... creating..."; fd_ = ::open(logPath.c_str(), O_CREAT | O_EXCL, 0644); if (fd_ < 0) { - PLOG(WARNING) << "Could not create wdt log (maybe ok if race): " - << logPath; + WPLOG(WARNING) << "Could not create wdt log (maybe ok if race): " + << logPath; } else { // On windows/cygwin for instance the flock will silently succeed yet // not lock on a newly created file... workaround is to close and reopen @@ -268,16 +268,16 @@ ErrorCode TransferLogManager::openLog() { } fd_ = ::open(logPath.c_str(), O_RDWR); if (fd_ < 0) { - PLOG(ERROR) << "Still couldn't open wdt log after create attempt: " - << logPath; + WPLOG(ERROR) << "Still couldn't open wdt log after create attempt: " + << logPath; return TRANSFER_LOG_ACQUIRE_ERROR; } } } // try to acquire file lock if (::flock(fd_, LOCK_EX | LOCK_NB) != 0) { - PLOG(ERROR) << "Failed to acquire transfer log lock " << logPath << " " - << fd_; + WPLOG(ERROR) << "Failed to acquire transfer log lock " << logPath << " " + << fd_; close(); return TRANSFER_LOG_ACQUIRE_ERROR; } @@ -305,13 +305,13 @@ ErrorCode TransferLogManager::checkLog() { const string fullLogName = getFullPath(kWdtLogName); struct stat stat1, stat2; if (stat(fullLogName.c_str(), &stat1)) { - PLOG(ERROR) << "CORRUPTION! Can't stat log file " << fullLogName - << " (deleted under us)"; + WPLOG(ERROR) << "CORRUPTION! Can't stat log file " << fullLogName + << " (deleted under us)"; exit(TRANSFER_LOG_ACQUIRE_ERROR); return ERROR; } if (fstat(fd_, &stat2)) { - PLOG(ERROR) << "Unable to stat log by fd " << fd_; + WPLOG(ERROR) << "Unable to stat log by fd " << fd_; exit(TRANSFER_LOG_ACQUIRE_ERROR); return ERROR; } @@ -332,7 +332,7 @@ void TransferLogManager::close() { } checkLog(); if (::close(fd_) != 0) { - PLOG(ERROR) << "Failed to close wdt log " << fd_; + WPLOG(ERROR) << "Failed to close wdt log " << fd_; } else { WLOG(INFO) << "Transfer log closed"; } @@ -389,8 +389,8 @@ void TransferLogManager::writeEntriesToDisk() { int64_t toWrite = buffer.size(); int64_t written = ::write(fd_, buffer.c_str(), toWrite); if (written != toWrite) { - PLOG(ERROR) << "Disk write error while writing transfer log " << written - << " " << toWrite; + WPLOG(ERROR) << "Disk write error while writing transfer log " << written + << " " << toWrite; return; } } @@ -425,7 +425,7 @@ bool TransferLogManager::verifySenderIp(const string &curSenderIp) { void TransferLogManager::fsync() { WDT_CHECK(fd_ >= 0); if (::fsync(fd_) != 0) { - PLOG(ERROR) << "fsync failed for transfer log " << fd_; + WPLOG(ERROR) << "fsync failed for transfer log " << fd_; } } @@ -440,7 +440,7 @@ void TransferLogManager::invalidateDirectory() { encoderDecoder_.encodeDirectoryInvalidationEntry(buf, sizeof(buf)); int64_t written = ::write(fd_, buf, size); if (written != size) { - PLOG(ERROR) + WPLOG(ERROR) << "Disk write error while writing directory invalidation entry " << written << " " << size; closeLog(); @@ -461,8 +461,8 @@ void TransferLogManager::writeLogHeader() { buf, kMaxEntryLength, recoveryId_, senderIp_, config_); int64_t written = ::write(fd_, buf, size); if (written != size) { - PLOG(ERROR) << "Disk write error while writing log header " << written - << " " << size; + WPLOG(ERROR) << "Disk write error while writing log header " << written + << " " << size; closeLog(); return; } @@ -532,7 +532,7 @@ void TransferLogManager::unlink() { WLOG(INFO) << "unlinking " << kWdtLogName; string fullLogName = getFullPath(kWdtLogName); if (::unlink(fullLogName.c_str()) != 0) { - PLOG(ERROR) << "Could not unlink " << fullLogName; + WPLOG(ERROR) << "Could not unlink " << fullLogName; } } @@ -541,8 +541,8 @@ void TransferLogManager::renameBuggyLog() { WLOG(INFO) << "Renaming " << kWdtLogName << " to " << kWdtBuggyLogName; if (::rename(getFullPath(kWdtLogName).c_str(), getFullPath(kWdtBuggyLogName).c_str()) != 0) { - PLOG(ERROR) << "log rename failed " << kWdtLogName << " " - << kWdtBuggyLogName; + WPLOG(ERROR) << "log rename failed " << kWdtLogName << " " + << kWdtBuggyLogName; } return; } @@ -608,9 +608,9 @@ bool LogParser::writeFileInvalidationEntries(int fd, encoderDecoder_.encodeFileInvalidationEntry(buf, sizeof(buf), seqId); int64_t written = ::write(fd, buf, size); if (written != size) { - PLOG(ERROR) << "Disk write error while writing invalidation entry to " - "transfer log " - << written << " " << size; + WPLOG(ERROR) << "Disk write error while writing invalidation entry to " + "transfer log " + << written << " " << size; return false; } } @@ -622,18 +622,18 @@ bool LogParser::truncateExtraBytesAtEnd(int fd, int64_t extraBytes) { << " bytes from the end of transfer log"; struct stat statBuffer; if (fstat(fd, &statBuffer) != 0) { - PLOG(ERROR) << "fstat failed on fd " << fd; + WPLOG(ERROR) << "fstat failed on fd " << fd; return false; } off_t fileSize = statBuffer.st_size; if (::ftruncate(fd, fileSize - extraBytes) != 0) { - PLOG(ERROR) << "ftruncate failed for fd " << fd; + WPLOG(ERROR) << "ftruncate failed for fd " << fd; return false; } // ftruncate does not change the offset, so change the offset to the end of // the log if (::lseek(fd, fileSize - extraBytes, SEEK_SET) < 0) { - PLOG(ERROR) << "lseek failed for fd " << fd; + WPLOG(ERROR) << "lseek failed for fd " << fd; return false; } return true; @@ -744,7 +744,7 @@ ErrorCode LogParser::processFileCreationEntry(char *buf, int64_t size) { string fullPath; folly::toAppend(rootDir_, fileName, &fullPath); if (stat(fullPath.c_str(), &buffer) != 0) { - PLOG(ERROR) << "stat failed for " << fileName; + WPLOG(ERROR) << "stat failed for " << fileName; } else { if (options_.shouldPreallocateFiles()) { sizeVerificationSuccess = (buffer.st_size >= fileSize); @@ -919,8 +919,8 @@ ErrorCode LogParser::parseLog(int fd, string &senderIp, int64_t toRead = sizeof(int16_t); int64_t numRead = ::read(fd, &entrySize, toRead); if (numRead < 0) { - PLOG(ERROR) << "Error while reading transfer log " << numRead << " " - << toRead; + WPLOG(ERROR) << "Error while reading transfer log " << numRead << " " + << toRead; return INVALID_LOG; } if (numRead == 0) { @@ -944,8 +944,8 @@ ErrorCode LogParser::parseLog(int fd, string &senderIp, } numRead = ::read(fd, entry, entrySize); if (numRead < 0) { - PLOG(ERROR) << "Error while reading transfer log " << numRead << " " - << entrySize; + WPLOG(ERROR) << "Error while reading transfer log " << numRead << " " + << entrySize; return INVALID_LOG; } if (numRead != entrySize) { diff --git a/util/WdtSocket.cpp b/util/WdtSocket.cpp index 3234be87..9583e963 100644 --- a/util/WdtSocket.cpp +++ b/util/WdtSocket.cpp @@ -253,8 +253,8 @@ int64_t WdtSocket::ioWithAbortCheck(F readOrWrite, T tbuf, int64_t numBytes, if (ret < 0) { // error if (errno != EINTR && errno != EAGAIN) { - PLOG(ERROR) << "non-retryable error encountered during socket io " - << fd_ << " " << doneBytes << " " << retries; + WPLOG(ERROR) << "non-retryable error encountered during socket io " + << fd_ << " " << doneBytes << " " << retries; return (doneBytes > 0 ? doneBytes : ret); } } else if (ret == 0) { @@ -296,7 +296,7 @@ ErrorCode WdtSocket::shutdownWrites() { ErrorCode code = finalizeWrites(true); if (::shutdown(fd_, SHUT_WR) < 0) { if (code == OK) { - PLOG(WARNING) << "Socket shutdown failed for fd " << fd_; + WPLOG(WARNING) << "Socket shutdown failed for fd " << fd_; code = ERROR; } } @@ -346,7 +346,7 @@ ErrorCode WdtSocket::finalizeWrites(bool doTagIOs) { const int timeoutMs = threadCtx_.getOptions().write_timeout_millis; const int expected = tag.size(); if (writeInternal(tag.data(), tag.size(), timeoutMs, false) != expected) { - PLOG(ERROR) << "Encryption Tag write error"; + WPLOG(ERROR) << "Encryption Tag write error"; code = ENCRYPTION_ERROR; } } @@ -374,7 +374,7 @@ ErrorCode WdtSocket::closeConnectionInternal(bool doTagIOs) { errorCode = getMoreInterestingError(errorCode, finalizeReads(doTagIOs)); } if (::close(fd_) != 0) { - PLOG(ERROR) << "Failed to close socket " << fd_ << " " << port_; + WPLOG(ERROR) << "Failed to close socket " << fd_ << " " << port_; errorCode = getMoreInterestingError(ERROR, errorCode); } // This looks like a reset() make it explicit (and check it's complete) @@ -454,7 +454,7 @@ void WdtSocket::setSocketTimeouts() { tv.tv_usec = (readTimeout % 1000) * 1000; // milli to micro if (setsockopt(fd_, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(struct timeval)) != 0) { - PLOG(ERROR) << "Unable to set read timeout for " << port_ << " " << fd_; + WPLOG(ERROR) << "Unable to set read timeout for " << port_ << " " << fd_; } } int writeTimeout = @@ -465,7 +465,7 @@ void WdtSocket::setSocketTimeouts() { tv.tv_usec = (writeTimeout % 1000) * 1000; // milli to micro if (setsockopt(fd_, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv, sizeof(struct timeval)) != 0) { - PLOG(ERROR) << "Unable to set write timeout for " << port_ << " " << fd_; + WPLOG(ERROR) << "Unable to set write timeout for " << port_ << " " << fd_; } } } @@ -508,7 +508,7 @@ int WdtSocket::getUnackedBytes() const { ret = ::ioctl(fd_, SIOCOUTQ, &numUnackedBytes); } if (ret != 0) { - PLOG(ERROR) << "Failed to get unacked bytes for socket " << fd_; + WPLOG(ERROR) << "Failed to get unacked bytes for socket " << fd_; numUnackedBytes = -1; } return numUnackedBytes;