Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tickets/dm 43139 #851

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 35 additions & 33 deletions src/wbase/FileChannelShared.cc
Original file line number Diff line number Diff line change
Expand Up @@ -267,11 +267,7 @@ shared_ptr<FileChannelShared> FileChannelShared::create(shared_ptr<wbase::SendCh

FileChannelShared::FileChannelShared(shared_ptr<wbase::SendChannel> const& sendChannel, qmeta::CzarId czarId,
string const& workerId)
: _sendChannel(sendChannel),
_czarId(czarId),
_workerId(workerId),
_protobufArena(make_unique<google::protobuf::Arena>()),
_scsId(scsSeqId++) {
: _sendChannel(sendChannel), _czarId(czarId), _workerId(workerId), _scsId(scsSeqId++) {
LOGS(_log, LOG_LVL_DEBUG, "FileChannelShared created");
if (_sendChannel == nullptr) {
throw util::Bug(ERR_LOC, "FileChannelShared constructor given nullptr");
Expand All @@ -283,7 +279,7 @@ FileChannelShared::~FileChannelShared() {
// dead it means there was a problem to process a query or send back a response
// to Czar. In either case, the file would be useless and it has to be deleted
// in order to avoid leaving unclaimed result files within the results folder.
if (isDead()) {
if (_issueRequiresFileRemoval || isDead()) {
_removeFile(lock_guard<mutex>(_tMtx));
}
if (_sendChannel != nullptr) {
Expand Down Expand Up @@ -321,8 +317,9 @@ string FileChannelShared::makeIdStr(int qId, int jId) {

bool FileChannelShared::buildAndTransmitError(util::MultiError& multiErr, shared_ptr<Task> const& task,
bool cancelled) {
std::unique_ptr<google::protobuf::Arena> protobufArena = make_unique<google::protobuf::Arena>();
lock_guard<mutex> const tMtxLock(_tMtx);
if (!_sendResponse(tMtxLock, task, cancelled, multiErr)) {
if (!_sendResponse(tMtxLock, protobufArena, task, cancelled, multiErr)) {
LOGS(_log, LOG_LVL_ERROR, "Could not transmit the error message to Czar.");
return false;
}
Expand All @@ -348,19 +345,18 @@ bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptr<Ta
bool erred = false;
bool hasMoreRows = true;

while (hasMoreRows && !cancelled) {
// This lock is to protect the stream from having other Tasks mess with it
// while data is loading.
lock_guard<mutex> const tMtxLockA(_tMtx);
std::unique_ptr<google::protobuf::Arena> protobufArena = make_unique<google::protobuf::Arena>();
proto::ResponseData* responseData = nullptr;

while (hasMoreRows && !cancelled) {
util::Timer bufferFillT;
bufferFillT.start();

// Transfer as many rows as it's allowed by limitations of
// the Google Protobuf into the output file.
int bytes = 0;
int rows = 0;
hasMoreRows = _writeToFile(tMtxLockA, task, mResult, bytes, rows, multiErr);
hasMoreRows = _writeToFile(responseData, protobufArena, task, mResult, bytes, rows, multiErr);
bytesTransmitted += bytes;
rowsTransmitted += rows;
_rowcount += rows;
Expand Down Expand Up @@ -390,13 +386,15 @@ bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptr<Ta
// the current request (note that certain classes of requests may require
// more than one task for processing).
if (!hasMoreRows && transmitTaskLast()) {
lock_guard<mutex> const tMtxLock(_tMtx);

// Make sure the file is sync to disk before notifying Czar.
_file.flush();
_file.close();

// Only the last ("summary") message, w/o any rows, is sent to the Czar to notify
// it about the completion of the request.
if (!_sendResponse(tMtxLockA, task, cancelled, multiErr)) {
if (!_sendResponse(tMtxLock, protobufArena, task, cancelled, multiErr)) {
LOGS(_log, LOG_LVL_ERROR, "Could not transmit the request completion message to Czar.");
erred = true;
break;
Expand All @@ -421,8 +419,9 @@ bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptr<Ta
// successfully processing the query and writing all results into the file.
// The file is not going to be used by Czar in either of these scenarios.
if (cancelled || erred || isDead()) {
lock_guard<mutex> const tMtxLockA(_tMtx);
_removeFile(tMtxLockA);
// Set a flag to delete the file in the destructor. That should prevent any
// possible race conditions with other threads expecting the file to exist.
_issueRequiresFileRemoval = true;
}
return erred;
}
Expand All @@ -432,28 +431,29 @@ bool FileChannelShared::_kill(lock_guard<mutex> const& streamMutexLock, string c
return _sendChannel->kill(note);
}

bool FileChannelShared::_writeToFile(lock_guard<mutex> const& tMtxLock, shared_ptr<Task> const& task,
MYSQL_RES* mResult, int& bytes, int& rows, util::MultiError& multiErr) {
bool FileChannelShared::_writeToFile(proto::ResponseData* responseData,
unique_ptr<google::protobuf::Arena> const& protobufArena,
shared_ptr<Task> const& task, MYSQL_RES* mResult, int& bytes, int& rows,
util::MultiError& multiErr) {
// Transfer rows from a result set into the response data object.
if (nullptr == _responseData) {
_responseData = google::protobuf::Arena::CreateMessage<proto::ResponseData>(_protobufArena.get());
if (nullptr == responseData) {
responseData = google::protobuf::Arena::CreateMessage<proto::ResponseData>(protobufArena.get());
} else {
_responseData->clear_row();
responseData->clear_row();
}
size_t tSize = 0;
LOGS(_log, LOG_LVL_TRACE, __func__ << " _fillRows " << task->getIdStr() << " start");
bool const hasMoreRows = _fillRows(tMtxLock, mResult, rows, tSize);
LOGS(_log, LOG_LVL_TRACE, __func__ << " _fillRows " << task->getIdStr() << " end");
_responseData->set_rowcount(rows);
_responseData->set_transmitsize(tSize);
bool const hasMoreRows = _fillRows(responseData, mResult, rows, tSize);
responseData->set_rowcount(rows);
responseData->set_transmitsize(tSize);

// Serialize the content of the data buffer into the Protobuf data message
// that will be written into the output file.
std::string msg;
_responseData->SerializeToString(&msg);
responseData->SerializeToString(&msg);
bytes = msg.size();

LOGS(_log, LOG_LVL_TRACE, __func__ << " file write " << task->getIdStr() << " start");
lock_guard<mutex> const tMtxLock(_tMtx);
// Create the file if not open.
if (!_file.is_open()) {
_fileName = task->resultFilePath();
Expand All @@ -478,7 +478,7 @@ bool FileChannelShared::_writeToFile(lock_guard<mutex> const& tMtxLock, shared_p
return hasMoreRows;
}

bool FileChannelShared::_fillRows(lock_guard<mutex> const& tMtxLock, MYSQL_RES* mResult, int& rows,
bool FileChannelShared::_fillRows(proto::ResponseData* responseData, MYSQL_RES* mResult, int& rows,
size_t& tSize) {
int const numFields = mysql_num_fields(mResult);
unsigned int szLimit = min(proto::ProtoHeaderWrap::PROTOBUFFER_DESIRED_LIMIT,
Expand All @@ -488,7 +488,7 @@ bool FileChannelShared::_fillRows(lock_guard<mutex> const& tMtxLock, MYSQL_RES*
MYSQL_ROW row;
while ((row = mysql_fetch_row(mResult))) {
auto lengths = mysql_fetch_lengths(mResult);
proto::RowBundle* rawRow = _responseData->add_row();
proto::RowBundle* rawRow = responseData->add_row();
for (int i = 0; i < numFields; ++i) {
if (row[i]) {
rawRow->add_column(row[i], lengths[i]);
Expand Down Expand Up @@ -521,8 +521,10 @@ void FileChannelShared::_removeFile(lock_guard<mutex> const& tMtxLock) {
}
}

bool FileChannelShared::_sendResponse(lock_guard<mutex> const& tMtxLock, shared_ptr<Task> const& task,
bool cancelled, util::MultiError const& multiErr) {
bool FileChannelShared::_sendResponse(lock_guard<mutex> const& tMtxLock,
std::unique_ptr<google::protobuf::Arena> const& protobufArena,
shared_ptr<Task> const& task, bool cancelled,
util::MultiError const& multiErr) {
auto const queryId = task->getQueryId();
auto const jobId = task->getJobId();
auto const idStr(makeIdStr(queryId, jobId));
Expand All @@ -534,10 +536,10 @@ bool FileChannelShared::_sendResponse(lock_guard<mutex> const& tMtxLock, shared_
// This will deallocate any memory managed by the Google Protobuf Arena
// to avoid unnecessary memory utilization by the application.
LOGS(_log, LOG_LVL_DEBUG,
__func__ << ": Google Protobuf Arena, 1:SpaceUsed=" << _protobufArena->SpaceUsed());
_protobufArena->Reset();
__func__ << ": Google Protobuf Arena, 1:SpaceUsed=" << protobufArena->SpaceUsed());
protobufArena->Reset();
LOGS(_log, LOG_LVL_DEBUG,
__func__ << ": Google Protobuf Arena, 2:SpaceUsed=" << _protobufArena->SpaceUsed());
__func__ << ": Google Protobuf Arena, 2:SpaceUsed=" << protobufArena->SpaceUsed());

QSERV_LOGCONTEXT_QUERY_JOB(queryId, jobId);
LOGS(_log, LOG_LVL_DEBUG, __func__);
Expand Down
30 changes: 18 additions & 12 deletions src/wbase/FileChannelShared.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,10 @@ class FileChannelShared {
* @note The method may not extract all rows if the amount of data found
* in the result set exceeded the maximum size allowed by the Google Protobuf
* implementation. Also, the iterative approach to the data extraction allows
* the driving code to be interrupted should the correponding query be cancelled
* the driving code to be interrupted should the corresponding query be cancelled
* during the lengthy data processing phase.
* @param tMtxLock - a lock on the mutex tMtx
* @param responseData - proto buffer to hold the response being constructed.
* @param protobufArena - proto buffer memory management control.
* @param task - a task that produced the result set
* @param mResult - MySQL result to be used as a source
* @param bytes - the number of bytes in the result message recorded into the file
Expand All @@ -192,19 +193,23 @@ class FileChannelShared {
* @throws std::runtime_error for problems encountered when attemting to create the file
* or write into the file.
*/
bool _writeToFile(std::lock_guard<std::mutex> const& tMtxLock, std::shared_ptr<Task> const& task,
MYSQL_RES* mResult, int& bytes, int& rows, util::MultiError& multiErr);
bool _writeToFile(proto::ResponseData* responseData,
std::unique_ptr<google::protobuf::Arena> const& protobufArena,
std::shared_ptr<Task> const& task, MYSQL_RES* mResult, int& bytes, int& rows,
util::MultiError& multiErr);

/**
* Extract as many rows as allowed by the Google Protobuf implementation from
* from the input result set into the output result object.
* @param tMtxLock - a lock on the mutex tMtx
* @param responseData - proto buffer to hold the response being constructed.
* @param protobufArena - proto buffer memory management control.
* @param mResult - MySQL result to be used as a source
* @param rows - the number of rows extracted from the result set
* @param tSize - the approximate amount of data extracted from the result set
* @return 'true' if there are more rows left in the result set.
*/
bool _fillRows(std::lock_guard<std::mutex> const& tMtxLock, MYSQL_RES* mResult, int& rows, size_t& tSize);
static bool _fillRows(proto::ResponseData* responseData, MYSQL_RES* mResult, int& rows, size_t& tSize);

/**
* Unconditionaly close and remove (potentially - the partially written) file.
* This method gets called in case of any failure detected while processing
Expand All @@ -225,19 +230,16 @@ class FileChannelShared {
* @param multiErr - a collector of any errors that were captured during result set processing
* @return 'true' if the operation was successfull
*/
bool _sendResponse(std::lock_guard<std::mutex> const& tMtxLock, std::shared_ptr<Task> const& task,
bool cancelled, util::MultiError const& multiErr);
bool _sendResponse(std::lock_guard<std::mutex> const& tMtxLock,
std::unique_ptr<google::protobuf::Arena> const& protobufArena,
std::shared_ptr<Task> const& task, bool cancelled, util::MultiError const& multiErr);

mutable std::mutex _tMtx; ///< Protects data recording and Czar notification

std::shared_ptr<wbase::SendChannel> const _sendChannel; ///< Used to pass encoded information to XrdSsi.
qmeta::CzarId const _czarId; ///< id of the czar that requested this task(s).
std::string const _workerId; ///< The unique identifier of the worker.

// Allocatons/deletion of the data messages are managed by Google Protobuf Arena.
std::unique_ptr<google::protobuf::Arena> _protobufArena;
proto::ResponseData* _responseData = 0;

uint64_t const _scsId; ///< id number for this FileChannelShared

/// streamMutex is used to protect _lastCount and messages that are sent
Expand Down Expand Up @@ -272,6 +274,10 @@ class FileChannelShared {

uint32_t _rowcount = 0; ///< The total numnber of rows in all result sets of a query.
uint64_t _transmitsize = 0; ///< The total amount of data (bytes) in all result sets of a query.

/// This should be set to true if there were any issues that invalidate the file, such as errors
/// or cancellation.
std::atomic<bool> _issueRequiresFileRemoval{false};
};

} // namespace lsst::qserv::wbase
Expand Down
Loading