diff --git a/src/chunkserver/chunkserver_entry.cc b/src/chunkserver/chunkserver_entry.cc index 54c94c6ef..59ae62ed7 100644 --- a/src/chunkserver/chunkserver_entry.cc +++ b/src/chunkserver/chunkserver_entry.cc @@ -155,7 +155,7 @@ ChunkserverEntry::createDetachedPacketWithOutputBuffer( getReadOutputBufferPool().put(std::move(outPacket->outputBuffer)); } - return nullptr; + return kInvalidPacket; } return outPacket; @@ -236,22 +236,22 @@ int ChunkserverEntry::initConnection() { fwdSocket = tcpsocket(); if (fwdSocket < 0) { safs_pretty_errlog(LOG_WARNING, "create socket, error"); - return -1; + return kInitConnectionFailed; } if (tcpnonblock(fwdSocket) < 0) { safs_pretty_errlog(LOG_WARNING, "set nonblock, error"); tcpclose(fwdSocket); - fwdSocket = -1; - return -1; + fwdSocket = kInvalidSocket; + return kInitConnectionFailed; } status = tcpnumconnect(fwdSocket, fwdServer.ip, fwdServer.port); if (status < 0) { safs_pretty_errlog(LOG_WARNING, "connect failed, error"); tcpclose(fwdSocket); - fwdSocket = -1; - return -1; + fwdSocket = kInvalidSocket; + return kInitConnectionFailed; } if (status == 0) { // connected immediately @@ -262,17 +262,17 @@ int ChunkserverEntry::initConnection() { connectStartTimeUSec = eventloop_utime(); } - return 0; + return kInitConnectionOK; } void ChunkserverEntry::retryConnect() { TRACETHIS(); tcpclose(fwdSocket); - fwdSocket = -1; + fwdSocket = kInvalidSocket; connectRetryCounter++; if (connectRetryCounter < kConnectRetries) { - if (initConnection() < 0) { + if (initConnection() < kInitConnectionOK) { fwdError(); return; } @@ -373,7 +373,7 @@ void ChunkserverEntry::readContinue() { messageSerializer->serializePrefixOfCstoclReadData( readDataPrefix, chunkId, offset, thisPartSize); auto packet = createDetachedPacketWithOutputBuffer(readDataPrefix); - if (packet == nullptr) { + if (packet == kInvalidPacket) { state = State::Close; return; } @@ -408,7 +408,9 @@ void ChunkserverEntry::readContinue() { } void ChunkserverEntry::ping(const uint8_t *data, PacketHeader::Length length) { - if (length != 4) { + static constexpr uint32_t kExpectedPingSize = sizeof(uint32_t); + + if (length != kExpectedPingSize) { state = State::Close; return; } @@ -636,7 +638,7 @@ void ChunkserverEntry::writeInit(const uint8_t *data, PacketHeader::Type type, fwdStartPtr = fwdInitPacket.data(); fwdBytesLeft = fwdInitPacket.size(); connectRetryCounter = 0; - if (initConnection() < 0) { + if (initConnection() < kInitConnectionOK) { std::vector buffer; messageSerializer->serializeCstoclWriteStatus( buffer, chunkId, 0, SAUNAFS_ERROR_CANTCONNECT); @@ -829,7 +831,7 @@ void ChunkserverEntry::writeEnd(const uint8_t *data, uint32_t length) { // TODO(msulikowski) if we want to use a ConnectionPool, this the right // place to put the connection to the pool. tcpclose(fwdSocket); - fwdSocket = -1; + fwdSocket = kInvalidSocket; } inputPacket.useAlignedMemory = false; state = State::Idle; @@ -916,7 +918,7 @@ void ChunkserverEntry::hddListV2([[maybe_unused]] const uint8_t *data, uint32_t opSize; uint8_t *ptr; - if (length != 0) { + if (length != 0) { // This packet should not have any data safs_pretty_syslog(LOG_NOTICE, "CLTOCS_HDD_LIST_V2 - wrong size (%" PRIu32 "/0)", length); @@ -950,9 +952,9 @@ void ChunkserverEntry::generateChartPNGorCSV(const uint8_t *data, uint8_t *ptr; uint32_t len; - if (length != 4) { - safs_pretty_syslog(LOG_NOTICE, - "CLTOAN_CHART - wrong size (%" PRIu32 "/4)", length); + if (length != kGenerateChartExpectedPacketSize) { + safs::log_info("CLTOAN_CHART - wrong size ({}/{})", length, + kGenerateChartExpectedPacketSize); state = State::Close; return; } @@ -978,10 +980,9 @@ void ChunkserverEntry::generateChartData(const uint8_t *data, uint32_t length) { uint8_t *ptr; uint32_t len; - if (length != 4) { - safs_pretty_syslog(LOG_NOTICE, - "CLTOAN_CHART_DATA - wrong size (%" PRIu32 "/4)", - length); + if (length != kGenerateChartExpectedPacketSize) { + safs::log_info("CLTOAN_CHART_DATA - wrong size ({}/{})", length, + kGenerateChartExpectedPacketSize); state = State::Close; return; } @@ -1253,7 +1254,7 @@ void ChunkserverEntry::fwdRead() { if (fwdInputPacket.bytesLeft > 0) { return; } - ptr = fwdHeaderBuffer + 4; + ptr = fwdHeaderBuffer + sizeof(PacketHeader::Type); // skip type opSize = get32bit(&ptr); if (opSize > kMaxPacketSize) { safs_pretty_syslog(LOG_WARNING, diff --git a/src/chunkserver/chunkserver_entry.h b/src/chunkserver/chunkserver_entry.h index db2bc8b01..1555b42f3 100644 --- a/src/chunkserver/chunkserver_entry.h +++ b/src/chunkserver/chunkserver_entry.h @@ -47,6 +47,9 @@ constexpr uint32_t kIOAlignedPacketSize = disk::kIoBlockSize + SFSBLOCKSIZE; constexpr uint32_t kIOAlignedOffset = disk::kIoBlockSize - cltocs::writeData::kPrefixSize; +// Alias for better readability +#define kInvalidPacket nullptr + /** * @brief Encapsulates the data associated with a packet. * @@ -109,6 +112,13 @@ struct ChunkserverEntry { Closed // ready to be deleted }; + // Some constants to improve readability + static constexpr int kInvalidSocket = -1; + static constexpr int kInitConnectionOK = 0; + static constexpr int kInitConnectionFailed = -1; + static constexpr uint32_t kGenerateChartExpectedPacketSize = + sizeof(uint32_t); + void* workerJobPool; // Job pool assigned to a given network worker thread ChunkserverEntry::State state = ChunkserverEntry::State::Idle; @@ -116,7 +126,7 @@ struct ChunkserverEntry { ChunkserverEntry::Mode fwdMode = ChunkserverEntry::Mode::Header; int sock; - int fwdSocket = -1; ///< forwarding socket for writing + int fwdSocket = kInvalidSocket; ///< forwarding socket for writing uint64_t connectStartTimeUSec = 0; ///< for timeout and retry (usec) uint8_t connectRetryCounter = 0; ///< for timeout and retry NetworkAddress fwdServer; // the next server in write chain diff --git a/src/chunkserver/network_worker_thread.cc b/src/chunkserver/network_worker_thread.cc index b46cc32f9..3ba2a1bb7 100644 --- a/src/chunkserver/network_worker_thread.cc +++ b/src/chunkserver/network_worker_thread.cc @@ -59,7 +59,10 @@ NetworkWorkerThread::NetworkWorkerThread(uint32_t nrOfBgjobsWorkers, TRACETHIS(); eassert(pipe(notify_pipe) != -1); #ifdef F_SETPIPE_SZ - eassert(fcntl(notify_pipe[1], F_SETPIPE_SZ, 4096 * 32)); + // Increase the pipe size to 128 KiB to handle a larger number of jobs + // without backpressure. On modern linux, the default pipe size is 64 KiB. + static constexpr int kPageAlignedPipeSize = 4096 * 32; + eassert(fcntl(notify_pipe[1], F_SETPIPE_SZ, kPageAlignedPipeSize)); #endif bgJobPool_ = job_pool_new(nrOfBgjobsWorkers, bgjobsCount, &bgJobPoolWakeUpFd_);