Skip to content

Commit

Permalink
Fixed a bug in the state transition logic of the worker tasks
Browse files Browse the repository at this point in the history
In the previous code, the task were declared as "finished" after
being "booted" from a scheduler. In reality, the tasks could be
still in the active state processing the corresponding queries
and consuming resources (memory, CPU, MySQL connections).
This change made the task monitoring correct and allowed
to uncover tasks that were previously seen as "invisible".

The startup sequence of the worker tasks has been modified in the same
way to declare the task processing at a point where the actual
processing begins.
  • Loading branch information
iagaponenko committed Nov 1, 2023
1 parent d2346bc commit 25fa289
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 118 deletions.
6 changes: 4 additions & 2 deletions src/wbase/Task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ std::vector<Task::Ptr> Task::createTasks(std::shared_ptr<proto::TaskMsg> const&
std::shared_ptr<wdb::ChunkResourceMgr> const& chunkResourceMgr,
mysql::MySqlConfig const& mySqlConfig,
std::shared_ptr<wcontrol::SqlConnMgr> const& sqlConnMgr,
std::shared_ptr<wpublish::QueriesAndChunks> const& queriesAndChunks,
uint16_t resultsHttpPort) {
QueryId qId = taskMsg->queryid();
QSERV_LOGCONTEXT_QUERY_JOB(qId, taskMsg->jobid());
Expand Down Expand Up @@ -267,8 +268,9 @@ std::vector<Task::Ptr> Task::createTasks(std::shared_ptr<proto::TaskMsg> const&
}
for (auto task : vect) {
/// Set the function called when it is time to process the task.
auto func = [task, chunkResourceMgr, mySqlConfig, sqlConnMgr](util::CmdData*) {
auto qr = wdb::QueryRunner::newQueryRunner(task, chunkResourceMgr, mySqlConfig, sqlConnMgr);
auto func = [task, chunkResourceMgr, mySqlConfig, sqlConnMgr, queriesAndChunks](util::CmdData*) {
auto qr = wdb::QueryRunner::newQueryRunner(task, chunkResourceMgr, mySqlConfig, sqlConnMgr,
queriesAndChunks);
bool success = false;
try {
success = qr->runQuery();
Expand Down
4 changes: 3 additions & 1 deletion src/wbase/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ namespace lsst::qserv::wdb {
class ChunkResourceMgr;
}
namespace lsst::qserv::wpublish {
class QueriesAndChunks;
class QueryStatistics;
}
} // namespace lsst::qserv::wpublish

namespace lsst::qserv::wbase {

Expand Down Expand Up @@ -167,6 +168,7 @@ class Task : public util::CommandForThreadPool {
std::shared_ptr<wdb::ChunkResourceMgr> const& chunkResourceMgr,
mysql::MySqlConfig const& mySqlConfig,
std::shared_ptr<wcontrol::SqlConnMgr> const& sqlConnMgr,
std::shared_ptr<wpublish::QueriesAndChunks> const& queriesAndChunks,
uint16_t resultsHttpPort = 8080);

void setQueryStatistics(std::shared_ptr<wpublish::QueryStatistics> const& qC);
Expand Down
43 changes: 22 additions & 21 deletions src/wdb/QueryRunner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <cstddef>
#include <iostream>
#include <memory>
#include <thread>

// Third-party headers
#include <google/protobuf/arena.h>
Expand Down Expand Up @@ -82,8 +83,10 @@ namespace lsst::qserv::wdb {
QueryRunner::Ptr QueryRunner::newQueryRunner(wbase::Task::Ptr const& task,
ChunkResourceMgr::Ptr const& chunkResourceMgr,
mysql::MySqlConfig const& mySqlConfig,
shared_ptr<wcontrol::SqlConnMgr> const& sqlConnMgr) {
Ptr qr(new QueryRunner(task, chunkResourceMgr, mySqlConfig, sqlConnMgr)); // Private constructor.
shared_ptr<wcontrol::SqlConnMgr> const& sqlConnMgr,
shared_ptr<wpublish::QueriesAndChunks> const& queriesAndChunks) {
Ptr qr(new QueryRunner(task, chunkResourceMgr, mySqlConfig, sqlConnMgr,
queriesAndChunks)); // Private constructor.
// Let the Task know this is its QueryRunner.
bool cancelled = qr->_task->setTaskQueryRunner(qr);
if (cancelled) {
Expand All @@ -97,11 +100,13 @@ QueryRunner::Ptr QueryRunner::newQueryRunner(wbase::Task::Ptr const& task,
/// and correct setup of enable_shared_from_this.
QueryRunner::QueryRunner(wbase::Task::Ptr const& task, ChunkResourceMgr::Ptr const& chunkResourceMgr,
mysql::MySqlConfig const& mySqlConfig,
shared_ptr<wcontrol::SqlConnMgr> const& sqlConnMgr)
shared_ptr<wcontrol::SqlConnMgr> const& sqlConnMgr,
shared_ptr<wpublish::QueriesAndChunks> const& queriesAndChunks)
: _task(task),
_chunkResourceMgr(chunkResourceMgr),
_mySqlConfig(mySqlConfig),
_sqlConnMgr(sqlConnMgr) {
_sqlConnMgr(sqlConnMgr),
_queriesAndChunks(queriesAndChunks) {
[[maybe_unused]] int rc = mysql_thread_init();
assert(rc == 0);
}
Expand Down Expand Up @@ -162,17 +167,26 @@ bool QueryRunner::runQuery() {
throw util::Bug(ERR_LOC, "runQuery called twice");
}

// Start tracking the task.
_queriesAndChunks->startedTask(_task);

// Make certain our Task knows that this object is no longer in use when this function exits.
class Release {
public:
Release(wbase::Task::Ptr t, wbase::TaskQueryRunner* tqr) : _t{t}, _tqr{tqr} {}
~Release() { _t->freeTaskQueryRunner(_tqr); }
Release(wbase::Task::Ptr t, wbase::TaskQueryRunner* tqr,
shared_ptr<wpublish::QueriesAndChunks> const& queriesAndChunks)
: _t{t}, _tqr{tqr}, _queriesAndChunks(queriesAndChunks) {}
~Release() {
_queriesAndChunks->finishedTask(_t);
_t->freeTaskQueryRunner(_tqr);
}

private:
wbase::Task::Ptr _t;
wbase::TaskQueryRunner* _tqr;
shared_ptr<wpublish::QueriesAndChunks> const _queriesAndChunks;
};
Release release(_task, this);
Release release(_task, this, _queriesAndChunks);

if (_task->checkCancelled()) {
LOGS(_log, LOG_LVL_DEBUG, "runQuery, task was cancelled before it started.");
Expand Down Expand Up @@ -258,7 +272,7 @@ class ChunkResourceRequest {
}

private:
shared_ptr<ChunkResourceMgr> _mgr;
shared_ptr<ChunkResourceMgr> const _mgr;
wbase::Task& _task;
};

Expand Down Expand Up @@ -323,19 +337,6 @@ bool QueryRunner::_dispatchChannel() {
// Pass all information on to the shared object to add on to
// an existing message or build a new one as needed.
erred = _task->getSendChannel()->buildAndTransmitResult(res, _task, _multiError, _cancelled);

// ATTENTION: This call is needed to record the _actual_ completion time of the task.
// It rewrites the finish timestamp within the task that was made when the task got
// kicked off the scheduler (see the code block above where a value of _removedFromThreadPool
// gets tested) which is happening shortly after MySQL query finishes and before the data
// transmission to Czar starts.
// NOTE: Tasks would stay in the task "cemetery" (class wpublish::QueriesAndChunks)
// for about 5 minutes after they finish transmitting data. After that no info on
// the task is available.
// TODO: Investigate an option for recording state transitions of the persistent
// metadata store of the worker, or keeping the state transisitons in a separate transient
// store that won't be affected by the task destruction.
_task->finished(std::chrono::system_clock::now());
}
} catch (sql::SqlErrorObject const& e) {
LOGS(_log, LOG_LVL_ERROR, "dispatchChannel " << e.errMsg());
Expand Down
26 changes: 14 additions & 12 deletions src/wdb/QueryRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,17 @@
#include "wbase/Task.h"
#include "wdb/ChunkResource.h"

namespace lsst::qserv {

namespace xrdsvc {
namespace lsst::qserv::xrdsvc {
class StreamBuffer;
}
} // namespace lsst::qserv::xrdsvc

namespace wcontrol {
namespace lsst::qserv::wcontrol {
class SqlConnMgr;
} // namespace wcontrol
} // namespace lsst::qserv::wcontrol

} // namespace lsst::qserv
namespace lsst::qserv::wpublish {
class QueriesAndChunks;
} // namespace lsst::qserv::wpublish

namespace lsst::qserv::wdb {

Expand All @@ -64,10 +64,10 @@ namespace lsst::qserv::wdb {
class QueryRunner : public wbase::TaskQueryRunner, public std::enable_shared_from_this<QueryRunner> {
public:
using Ptr = std::shared_ptr<QueryRunner>;
static QueryRunner::Ptr newQueryRunner(wbase::Task::Ptr const& task,
ChunkResourceMgr::Ptr const& chunkResourceMgr,
mysql::MySqlConfig const& mySqlConfig,
std::shared_ptr<wcontrol::SqlConnMgr> const& sqlConnMgr);
static QueryRunner::Ptr newQueryRunner(
wbase::Task::Ptr const& task, ChunkResourceMgr::Ptr const& chunkResourceMgr,
mysql::MySqlConfig const& mySqlConfig, std::shared_ptr<wcontrol::SqlConnMgr> const& sqlConnMgr,
std::shared_ptr<wpublish::QueriesAndChunks> const& queriesAndChunks);
// Having more than one copy of this would making tracking its progress difficult.
QueryRunner(QueryRunner const&) = delete;
QueryRunner& operator=(QueryRunner const&) = delete;
Expand All @@ -86,7 +86,8 @@ class QueryRunner : public wbase::TaskQueryRunner, public std::enable_shared_fro
protected:
QueryRunner(wbase::Task::Ptr const& task, ChunkResourceMgr::Ptr const& chunkResourceMgr,
mysql::MySqlConfig const& mySqlConfig,
std::shared_ptr<wcontrol::SqlConnMgr> const& sqlConnMgr);
std::shared_ptr<wcontrol::SqlConnMgr> const& sqlConnMgr,
std::shared_ptr<wpublish::QueriesAndChunks> const& queriesAndChunks);

private:
bool _initConnection();
Expand Down Expand Up @@ -115,6 +116,7 @@ class QueryRunner : public wbase::TaskQueryRunner, public std::enable_shared_fro

/// Used to limit the number of open MySQL connections.
std::shared_ptr<wcontrol::SqlConnMgr> const _sqlConnMgr;
std::shared_ptr<wpublish::QueriesAndChunks> const _queriesAndChunks;
std::atomic<bool> _runQueryCalled{false}; ///< If runQuery gets called twice, the scheduler messed up.
};

Expand Down
16 changes: 12 additions & 4 deletions src/wdb/testQueryRunner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "wcontrol/TransmitMgr.h"
#include "wdb/ChunkResource.h"
#include "wdb/QueryRunner.h"
#include "wpublish/QueriesAndChunks.h"

// Boost unit test header
#define BOOST_TEST_MODULE QueryRunner
Expand Down Expand Up @@ -70,6 +71,7 @@ using lsst::qserv::wdb::ChunkResource;
using lsst::qserv::wdb::ChunkResourceMgr;
using lsst::qserv::wdb::FakeBackend;
using lsst::qserv::wdb::QueryRunner;
using lsst::qserv::wpublish::QueriesAndChunks;

TransmitMgr::Ptr locTransmitMgr = make_shared<TransmitMgr>(50, 4);

Expand Down Expand Up @@ -100,6 +102,10 @@ struct Fixture {
}
return mySqlConfig;
}
shared_ptr<QueriesAndChunks> queriesAndChunks() {
bool resetForTesting = true;
return QueriesAndChunks::setupGlobal(chrono::seconds(1), chrono::seconds(300), 5, resetForTesting);
}
};

BOOST_FIXTURE_TEST_SUITE(Basic, Fixture)
Expand All @@ -112,9 +118,10 @@ BOOST_AUTO_TEST_CASE(Simple) {
FakeBackend::Ptr backend = make_shared<FakeBackend>();
shared_ptr<ChunkResourceMgr> crm = ChunkResourceMgr::newMgr(backend);
SqlConnMgr::Ptr sqlConnMgr = make_shared<SqlConnMgr>(20, 15);
auto taskVect = Task::createTasks(msg, sc, crm, newMySqlConfig(), sqlConnMgr);
auto const queries = queriesAndChunks();
auto taskVect = Task::createTasks(msg, sc, crm, newMySqlConfig(), sqlConnMgr, queries);
Task::Ptr task = taskVect[0];
QueryRunner::Ptr a(QueryRunner::newQueryRunner(task, crm, newMySqlConfig(), sqlConnMgr));
QueryRunner::Ptr a(QueryRunner::newQueryRunner(task, crm, newMySqlConfig(), sqlConnMgr, queries));
BOOST_CHECK(a->runQuery());
}

Expand All @@ -127,9 +134,10 @@ BOOST_AUTO_TEST_CASE(Output) {
FakeBackend::Ptr backend = make_shared<FakeBackend>();
shared_ptr<ChunkResourceMgr> crm = ChunkResourceMgr::newMgr(backend);
SqlConnMgr::Ptr sqlConnMgr = make_shared<SqlConnMgr>(20, 15);
auto taskVect = Task::createTasks(msg, sc, crm, newMySqlConfig(), sqlConnMgr);
auto const queries = queriesAndChunks();
auto taskVect = Task::createTasks(msg, sc, crm, newMySqlConfig(), sqlConnMgr, queries);
Task::Ptr task = taskVect[0];
QueryRunner::Ptr a(QueryRunner::newQueryRunner(task, crm, newMySqlConfig(), sqlConnMgr));
QueryRunner::Ptr a(QueryRunner::newQueryRunner(task, crm, newMySqlConfig(), sqlConnMgr, queries));
BOOST_CHECK(a->runQuery());

unsigned char phSize = *reinterpret_cast<unsigned char const*>(out.data());
Expand Down
3 changes: 0 additions & 3 deletions src/wsched/BlendScheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,6 @@ void BlendScheduler::commandStart(util::Command::Ptr const& cmd) {
} else {
LOGS(_log, LOG_LVL_ERROR, "BlendScheduler::commandStart scheduler not found");
}

_queries->startedTask(t);
_infoChanged = true;
}

Expand All @@ -277,7 +275,6 @@ void BlendScheduler::commandFinish(util::Command::Ptr const& cmd) {
}
_infoChanged = true;
_logChunkStatus();
_queries->finishedTask(t);
notify(true); // notify all=true
}

Expand Down
Loading

0 comments on commit 25fa289

Please sign in to comment.