Skip to content

Commit

Permalink
Merge branch 'tickets/DM-41358'
Browse files Browse the repository at this point in the history
  • Loading branch information
iagaponenko committed Nov 1, 2023
2 parents 434e406 + bb5a505 commit d2346bc
Show file tree
Hide file tree
Showing 11 changed files with 243 additions and 53 deletions.
3 changes: 3 additions & 0 deletions src/wbase/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,9 @@ class Task : public util::CommandForThreadPool {
/// Return a reference to the list of subchunk ids.
const IntVector& getSubchunksVect() const { return _dbTblsAndSubchunks->subchunksVect; }

/// Return an identifier of the corresponding MySQL query (if any was set).
unsigned long getMySqlThreadId() const { return _mysqlThreadId.load(); }

/// Set MySQL thread associated with a MySQL connection open before executing
/// task's queries. The identifier is sampled by the worker tasks monitoring
/// system in order to see what MySQL queries are being executed by tasks.
Expand Down
1 change: 1 addition & 0 deletions src/wcontrol/Foreman.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ class Foreman : public wbase::MsgProcessor {

std::shared_ptr<wdb::ChunkResourceMgr> const& chunkResourceMgr() const { return _chunkResourceMgr; }
mysql::MySqlConfig const& mySqlConfig() const { return _mySqlConfig; }
std::shared_ptr<wpublish::QueriesAndChunks> const& queriesAndChunks() const { return _queries; }
std::shared_ptr<wcontrol::SqlConnMgr> const& sqlConnMgr() const { return _sqlConnMgr; }
std::shared_ptr<wcontrol::TransmitMgr> const& transmitMgr() const { return _transmitMgr; }
uint16_t httpPort() const;
Expand Down
5 changes: 4 additions & 1 deletion src/wdb/QueryRunner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
#include "util/threadSafe.h"
#include "wbase/Base.h"
#include "wbase/ChannelShared.h"
#include "wconfig/WorkerConfig.h"
#include "wcontrol/SqlConnMgr.h"
#include "wdb/ChunkResource.h"
#include "wpublish/QueriesAndChunks.h"
Expand Down Expand Up @@ -299,7 +300,9 @@ bool QueryRunner::_dispatchChannel() {

// This thread may have already been removed from the pool for
// other reasons, such as taking too long.
if (not _removedFromThreadPool) {
bool const streamingProtocol = wconfig::WorkerConfig::instance()->resultDeliveryProtocol() ==
wconfig::WorkerConfig::ResultDeliveryProtocol::SSI;
if (streamingProtocol && !_removedFromThreadPool) {
// This query has been answered by the database and the
// scheduler for this worker should stop waiting for it.
// leavePool() will tell the scheduler this task is finished
Expand Down
20 changes: 18 additions & 2 deletions src/wpublish/GetDbStatusCommand.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "wpublish/GetDbStatusCommand.h"

// System headers
#include <set>
#include <stdexcept>

// Third party headers
Expand All @@ -34,6 +35,7 @@
#include "proto/worker.pb.h"
#include "wbase/SendChannel.h"
#include "wconfig/WorkerConfig.h"
#include "wpublish/QueriesAndChunks.h"

// LSST headers
#include "lsst/log/Log.h"
Expand All @@ -49,8 +51,9 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.wpublish.GetDbStatusCommand");

namespace lsst::qserv::wpublish {

GetDbStatusCommand::GetDbStatusCommand(shared_ptr<wbase::SendChannel> const& sendChannel)
: wbase::WorkerCommand(sendChannel) {}
GetDbStatusCommand::GetDbStatusCommand(shared_ptr<wbase::SendChannel> const& sendChannel,
shared_ptr<QueriesAndChunks> const& queriesAndChunks)
: wbase::WorkerCommand(sendChannel), _queriesAndChunks(queriesAndChunks) {}

void GetDbStatusCommand::run() {
string const context = "GetDbStatusCommand::" + string(__func__);
Expand All @@ -65,6 +68,19 @@ void GetDbStatusCommand::run() {
reportError<proto::WorkerCommandGetDbStatusR>(ex.what());
return;
}

// Amend the result with a map linking MySQL thread identifiers to the corresponding
// tasks that are being (or have been) processed by the worker. Note that only a subset
// of tasks is selected for the known MySQL threads. This prevents the monitoring
// system from pulling old tasks that may still keep records of the closed threads.
set<unsigned long> activeMySqlThreadIds;
for (auto const& row : result["queries"]["rows"]) {
// The thread identifier is stored as a string at the very first element
// of the array. See mysql::MySqlUtils::processList for details.
activeMySqlThreadIds.insert(stoul(row[0].get<string>()));
}
result["mysql_thread_to_task"] = _queriesAndChunks->mySqlThread2task(activeMySqlThreadIds);

proto::WorkerCommandGetDbStatusR reply;
reply.mutable_status();
reply.set_info(result.dump());
Expand Down
11 changes: 10 additions & 1 deletion src/wpublish/GetDbStatusCommand.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ namespace lsst::qserv::wbase {
class SendChannel;
} // namespace lsst::qserv::wbase

// Forward declarations
namespace lsst::qserv::wpublish {
class QueriesAndChunks;
} // namespace lsst::qserv::wpublish

// This header declarations
namespace lsst::qserv::wpublish {

Expand All @@ -45,7 +50,8 @@ class GetDbStatusCommand : public wbase::WorkerCommand {
/**
* @param sendChannel The communication channel for reporting results.
*/
GetDbStatusCommand(std::shared_ptr<wbase::SendChannel> const& sendChannel);
GetDbStatusCommand(std::shared_ptr<wbase::SendChannel> const& sendChannel,
std::shared_ptr<QueriesAndChunks> const& queriesAndChunks);

GetDbStatusCommand() = delete;
GetDbStatusCommand& operator=(GetDbStatusCommand const&) = delete;
Expand All @@ -55,6 +61,9 @@ class GetDbStatusCommand : public wbase::WorkerCommand {

protected:
virtual void run() override;

private:
std::shared_ptr<QueriesAndChunks> const _queriesAndChunks;
};

} // namespace lsst::qserv::wpublish
Expand Down
23 changes: 23 additions & 0 deletions src/wpublish/QueriesAndChunks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,29 @@ nlohmann::json QueriesAndChunks::statusToJson(wbase::TaskSelector const& taskSel
return status;
}

nlohmann::json QueriesAndChunks::mySqlThread2task(set<unsigned long> const& activeMySqlThreadIds) const {
nlohmann::json result = nlohmann::json::object();
lock_guard<mutex> g(_queryStatsMtx);
for (auto&& itr : _queryStats) {
QueryStatistics::Ptr const& qStats = itr.second;
for (auto&& task : qStats->_tasks) {
auto const threadId = task->getMySqlThreadId();
if ((threadId != 0) && activeMySqlThreadIds.contains(threadId)) {
// Force the identifier to be converted into a string because the JSON library
// doesn't support numeric keys in its dictionary class.
result[to_string(threadId)] =
nlohmann::json::object({{"query_id", task->getQueryId()},
{"job_id", task->getJobId()},
{"chunk_id", task->getChunkId()},
{"subchunk_id", task->getSubchunkId()},
{"template_id", task->getTemplateId()},
{"state", wbase::taskState2str(task->state())}});
}
}
}
return result;
}

/// @return a map that contains time totals for all chunks for tasks running on specific
/// tables. The map is sorted by table name and contains sub-maps ordered by chunk id.
/// The sub-maps contain information about how long tasks take to complete on that table
Expand Down
10 changes: 9 additions & 1 deletion src/wpublish/QueriesAndChunks.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <memory>
#include <mutex>
#include <ostream>
#include <set>
#include <string>
#include <vector>

Expand Down Expand Up @@ -235,12 +236,19 @@ class QueriesAndChunks {
void examineAll();

/**
* Retreive monitoring data for teh worker.
* Retreive monitoring data for the worker.
* @param taskSelector Task selection criterias.
* @return a JSON representation of the object's status for the monitoring
*/
nlohmann::json statusToJson(wbase::TaskSelector const& taskSelector) const;

/**
* Retrieve info on tasks that are associated with the specified MySQL threads.
* @param activeMySqlThreadIds A collection of the MySQL threads.
* @return a JSON object linking the threads to the corresponding tasks.
*/
nlohmann::json mySqlThread2task(std::set<unsigned long> const& activeMySqlThreadIds) const;

// Figure out each chunkTable's percentage of time.
// Store average time for a task to run on this table for this chunk.
struct ChunkTimePercent {
Expand Down
3 changes: 3 additions & 0 deletions src/www/qserv/css/QservMySQLConnections.css
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,6 @@ table#fwk-qserv-mysql-connections > thead > tr > th.sticky {
top:80px;
z-index:2;
}
table#fwk-qserv-mysql-connections tbody > tr.display-worker-queries:hover {
cursor:pointer;
}
48 changes: 28 additions & 20 deletions src/www/qserv/js/QservMySQLConnections.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,32 +136,40 @@ function(CSSLoader,
* Display MySQL connections
*/
_display(data) {
const queryInspectTitle = "Click to see MySQL queries runing on the worker's MySQL server.";
let html = '';
for (let worker in data) {
if (!data[worker].success) {
html += `
<tr>
<th class="table-warning">${worker}</th>
<td class="table-secondary">&nbsp;</td>
<td class="table-secondary">&nbsp;</td>
<td class="table-secondary">&nbsp;</td>
<td class="table-secondary">&nbsp;</td>
<td class="table-secondary">&nbsp;</td>
</tr>`;
} else {
let totalCount = '';
let sqlScanConnCount = '';
let maxSqlScanConnections = '';
let sqlSharedConnCount = '';
let maxSqlSharedConnections = '';
if (data[worker].success) {
let sql_conn_mgr = data[worker].info.processor.sql_conn_mgr;
html += `
<tr>
totalCount = sql_conn_mgr.totalCount;
sqlScanConnCount = sql_conn_mgr.sqlScanConnCount;
maxSqlScanConnections = sql_conn_mgr.maxSqlScanConnections;
sqlSharedConnCount = sql_conn_mgr.sqlSharedConnCount;
maxSqlSharedConnections = sql_conn_mgr.maxSqlSharedConnections;
}
html += `
<tr worker="${worker}" class="display-worker-queries" title="${queryInspectTitle}">
<th>${worker}</th>
<td style="text-align:right;"><pre>${sql_conn_mgr.totalCount}</pre></td>
<td style="text-align:right;"><pre>${sql_conn_mgr.sqlScanConnCount}</pre></td>
<td style="text-align:right;"><pre>${sql_conn_mgr.maxSqlScanConnections}</pre></td>
<td style="text-align:right;"><pre>${sql_conn_mgr.sqlSharedConnCount}</pre></td>
<td style="text-align:right;"><pre>${sql_conn_mgr.maxSqlSharedConnections}</pre></td>
<td style="text-align:right;"><pre>${totalCount}</pre></td>
<td style="text-align:right;"><pre>${sqlScanConnCount}</pre></td>
<td style="text-align:right;"><pre>${maxSqlScanConnections}</pre></td>
<td style="text-align:right;"><pre>${sqlSharedConnCount}</pre></td>
<td style="text-align:right;"><pre>${maxSqlSharedConnections}</pre></td>
</tr>`;
}
}
this._table().children('tbody').html(html);
let tbody = this._table().children('tbody');
tbody.html(html);
let displayWorkerQueries = function(e) {
const worker = $(e.currentTarget).attr("worker");
Fwk.find("Workers", "MySQL Queries").set_worker(worker);
Fwk.show("Workers", "MySQL Queries");
};
tbody.find("tr.display-worker-queries").click(displayWorkerQueries);
}
}
return QservMySQLConnections;
Expand Down
Loading

0 comments on commit d2346bc

Please sign in to comment.