diff --git a/src/wbase/Task.h b/src/wbase/Task.h index 2365a9db2..c94ff7cbe 100644 --- a/src/wbase/Task.h +++ b/src/wbase/Task.h @@ -285,6 +285,9 @@ class Task : public util::CommandForThreadPool { /// Return a reference to the list of subchunk ids. const IntVector& getSubchunksVect() const { return _dbTblsAndSubchunks->subchunksVect; } + /// Return an identifier of the corresponding MySQL query (if any was set). + unsigned long getMySqlThreadId() const { return _mysqlThreadId.load(); } + /// Set MySQL thread associated with a MySQL connection open before executing /// task's queries. The identifier is sampled by the worker tasks monitoring /// system in order to see what MySQL queries are being executed by tasks. diff --git a/src/wcontrol/Foreman.h b/src/wcontrol/Foreman.h index a6ba85804..a539f0782 100644 --- a/src/wcontrol/Foreman.h +++ b/src/wcontrol/Foreman.h @@ -119,6 +119,7 @@ class Foreman : public wbase::MsgProcessor { std::shared_ptr const& chunkResourceMgr() const { return _chunkResourceMgr; } mysql::MySqlConfig const& mySqlConfig() const { return _mySqlConfig; } + std::shared_ptr const& queriesAndChunks() const { return _queries; } std::shared_ptr const& sqlConnMgr() const { return _sqlConnMgr; } std::shared_ptr const& transmitMgr() const { return _transmitMgr; } uint16_t httpPort() const; diff --git a/src/wdb/QueryRunner.cc b/src/wdb/QueryRunner.cc index 49fa1bc8e..98bf0a595 100644 --- a/src/wdb/QueryRunner.cc +++ b/src/wdb/QueryRunner.cc @@ -65,6 +65,7 @@ #include "util/threadSafe.h" #include "wbase/Base.h" #include "wbase/ChannelShared.h" +#include "wconfig/WorkerConfig.h" #include "wcontrol/SqlConnMgr.h" #include "wdb/ChunkResource.h" #include "wpublish/QueriesAndChunks.h" @@ -299,7 +300,9 @@ bool QueryRunner::_dispatchChannel() { // This thread may have already been removed from the pool for // other reasons, such as taking too long. - if (not _removedFromThreadPool) { + bool const streamingProtocol = wconfig::WorkerConfig::instance()->resultDeliveryProtocol() == + wconfig::WorkerConfig::ResultDeliveryProtocol::SSI; + if (streamingProtocol && !_removedFromThreadPool) { // This query has been answered by the database and the // scheduler for this worker should stop waiting for it. // leavePool() will tell the scheduler this task is finished diff --git a/src/wpublish/GetDbStatusCommand.cc b/src/wpublish/GetDbStatusCommand.cc index 58276467d..7dd42a671 100644 --- a/src/wpublish/GetDbStatusCommand.cc +++ b/src/wpublish/GetDbStatusCommand.cc @@ -24,6 +24,7 @@ #include "wpublish/GetDbStatusCommand.h" // System headers +#include #include // Third party headers @@ -34,6 +35,7 @@ #include "proto/worker.pb.h" #include "wbase/SendChannel.h" #include "wconfig/WorkerConfig.h" +#include "wpublish/QueriesAndChunks.h" // LSST headers #include "lsst/log/Log.h" @@ -49,8 +51,9 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.wpublish.GetDbStatusCommand"); namespace lsst::qserv::wpublish { -GetDbStatusCommand::GetDbStatusCommand(shared_ptr const& sendChannel) - : wbase::WorkerCommand(sendChannel) {} +GetDbStatusCommand::GetDbStatusCommand(shared_ptr const& sendChannel, + shared_ptr const& queriesAndChunks) + : wbase::WorkerCommand(sendChannel), _queriesAndChunks(queriesAndChunks) {} void GetDbStatusCommand::run() { string const context = "GetDbStatusCommand::" + string(__func__); @@ -65,6 +68,19 @@ void GetDbStatusCommand::run() { reportError(ex.what()); return; } + + // Amend the result with a map linking MySQL thread identifiers to the corresponding + // tasks that are being (or have been) processed by the worker. Note that only a subset + // of tasks is selected for the known MySQL threads. This prevents the monitoring + // system from pulling old tasks that may still keep records of the closed threads. + set activeMySqlThreadIds; + for (auto const& row : result["queries"]["rows"]) { + // The thread identifier is stored as a string at the very first element + // of the array. See mysql::MySqlUtils::processList for details. + activeMySqlThreadIds.insert(stoul(row[0].get())); + } + result["mysql_thread_to_task"] = _queriesAndChunks->mySqlThread2task(activeMySqlThreadIds); + proto::WorkerCommandGetDbStatusR reply; reply.mutable_status(); reply.set_info(result.dump()); diff --git a/src/wpublish/GetDbStatusCommand.h b/src/wpublish/GetDbStatusCommand.h index 242fd1b49..92cba286d 100644 --- a/src/wpublish/GetDbStatusCommand.h +++ b/src/wpublish/GetDbStatusCommand.h @@ -33,6 +33,11 @@ namespace lsst::qserv::wbase { class SendChannel; } // namespace lsst::qserv::wbase +// Forward declarations +namespace lsst::qserv::wpublish { +class QueriesAndChunks; +} // namespace lsst::qserv::wpublish + // This header declarations namespace lsst::qserv::wpublish { @@ -45,7 +50,8 @@ class GetDbStatusCommand : public wbase::WorkerCommand { /** * @param sendChannel The communication channel for reporting results. */ - GetDbStatusCommand(std::shared_ptr const& sendChannel); + GetDbStatusCommand(std::shared_ptr const& sendChannel, + std::shared_ptr const& queriesAndChunks); GetDbStatusCommand() = delete; GetDbStatusCommand& operator=(GetDbStatusCommand const&) = delete; @@ -55,6 +61,9 @@ class GetDbStatusCommand : public wbase::WorkerCommand { protected: virtual void run() override; + +private: + std::shared_ptr const _queriesAndChunks; }; } // namespace lsst::qserv::wpublish diff --git a/src/wpublish/QueriesAndChunks.cc b/src/wpublish/QueriesAndChunks.cc index d0bdd2312..870084758 100644 --- a/src/wpublish/QueriesAndChunks.cc +++ b/src/wpublish/QueriesAndChunks.cc @@ -432,6 +432,29 @@ nlohmann::json QueriesAndChunks::statusToJson(wbase::TaskSelector const& taskSel return status; } +nlohmann::json QueriesAndChunks::mySqlThread2task(set const& activeMySqlThreadIds) const { + nlohmann::json result = nlohmann::json::object(); + lock_guard g(_queryStatsMtx); + for (auto&& itr : _queryStats) { + QueryStatistics::Ptr const& qStats = itr.second; + for (auto&& task : qStats->_tasks) { + auto const threadId = task->getMySqlThreadId(); + if ((threadId != 0) && activeMySqlThreadIds.contains(threadId)) { + // Force the identifier to be converted into a string because the JSON library + // doesn't support numeric keys in its dictionary class. + result[to_string(threadId)] = + nlohmann::json::object({{"query_id", task->getQueryId()}, + {"job_id", task->getJobId()}, + {"chunk_id", task->getChunkId()}, + {"subchunk_id", task->getSubchunkId()}, + {"template_id", task->getTemplateId()}, + {"state", wbase::taskState2str(task->state())}}); + } + } + } + return result; +} + /// @return a map that contains time totals for all chunks for tasks running on specific /// tables. The map is sorted by table name and contains sub-maps ordered by chunk id. /// The sub-maps contain information about how long tasks take to complete on that table diff --git a/src/wpublish/QueriesAndChunks.h b/src/wpublish/QueriesAndChunks.h index afa6dd1b3..f5214b116 100644 --- a/src/wpublish/QueriesAndChunks.h +++ b/src/wpublish/QueriesAndChunks.h @@ -31,6 +31,7 @@ #include #include #include +#include #include #include @@ -235,12 +236,19 @@ class QueriesAndChunks { void examineAll(); /** - * Retreive monitoring data for teh worker. + * Retreive monitoring data for the worker. * @param taskSelector Task selection criterias. * @return a JSON representation of the object's status for the monitoring */ nlohmann::json statusToJson(wbase::TaskSelector const& taskSelector) const; + /** + * Retrieve info on tasks that are associated with the specified MySQL threads. + * @param activeMySqlThreadIds A collection of the MySQL threads. + * @return a JSON object linking the threads to the corresponding tasks. + */ + nlohmann::json mySqlThread2task(std::set const& activeMySqlThreadIds) const; + // Figure out each chunkTable's percentage of time. // Store average time for a task to run on this table for this chunk. struct ChunkTimePercent { diff --git a/src/www/qserv/css/QservMySQLConnections.css b/src/www/qserv/css/QservMySQLConnections.css index 50b66acb8..ed42965da 100644 --- a/src/www/qserv/css/QservMySQLConnections.css +++ b/src/www/qserv/css/QservMySQLConnections.css @@ -22,3 +22,6 @@ table#fwk-qserv-mysql-connections > thead > tr > th.sticky { top:80px; z-index:2; } +table#fwk-qserv-mysql-connections tbody > tr.display-worker-queries:hover { + cursor:pointer; +} \ No newline at end of file diff --git a/src/www/qserv/js/QservMySQLConnections.js b/src/www/qserv/js/QservMySQLConnections.js index 176e951f3..3c440da2d 100644 --- a/src/www/qserv/js/QservMySQLConnections.js +++ b/src/www/qserv/js/QservMySQLConnections.js @@ -136,32 +136,40 @@ function(CSSLoader, * Display MySQL connections */ _display(data) { + const queryInspectTitle = "Click to see MySQL queries runing on the worker's MySQL server."; let html = ''; for (let worker in data) { - if (!data[worker].success) { - html += ` - - ${worker} -   -   -   -   -   -`; - } else { + let totalCount = ''; + let sqlScanConnCount = ''; + let maxSqlScanConnections = ''; + let sqlSharedConnCount = ''; + let maxSqlSharedConnections = ''; + if (data[worker].success) { let sql_conn_mgr = data[worker].info.processor.sql_conn_mgr; - html += ` - + totalCount = sql_conn_mgr.totalCount; + sqlScanConnCount = sql_conn_mgr.sqlScanConnCount; + maxSqlScanConnections = sql_conn_mgr.maxSqlScanConnections; + sqlSharedConnCount = sql_conn_mgr.sqlSharedConnCount; + maxSqlSharedConnections = sql_conn_mgr.maxSqlSharedConnections; + } + html += ` + ${worker} -
${sql_conn_mgr.totalCount}
-
${sql_conn_mgr.sqlScanConnCount}
-
${sql_conn_mgr.maxSqlScanConnections}
-
${sql_conn_mgr.sqlSharedConnCount}
-
${sql_conn_mgr.maxSqlSharedConnections}
+
${totalCount}
+
${sqlScanConnCount}
+
${maxSqlScanConnections}
+
${sqlSharedConnCount}
+
${maxSqlSharedConnections}
`; - } } - this._table().children('tbody').html(html); + let tbody = this._table().children('tbody'); + tbody.html(html); + let displayWorkerQueries = function(e) { + const worker = $(e.currentTarget).attr("worker"); + Fwk.find("Workers", "MySQL Queries").set_worker(worker); + Fwk.show("Workers", "MySQL Queries"); + }; + tbody.find("tr.display-worker-queries").click(displayWorkerQueries); } } return QservMySQLConnections; diff --git a/src/www/qserv/js/QservWorkerMySQLQueries.js b/src/www/qserv/js/QservWorkerMySQLQueries.js index 4a85d4372..2c89cb4f4 100644 --- a/src/www/qserv/js/QservWorkerMySQLQueries.js +++ b/src/www/qserv/js/QservWorkerMySQLQueries.js @@ -17,10 +17,10 @@ function(CSSLoader, constructor(name) { super(name); - this._queryId2Expanded = {}; // Store 'true' to allow persistent state for the expanded - // queries between updates. - this._id2query = {}; // Store query text for each identifier. The dictionary gets - // updated at each refresh of the page. + this._mySqlThreadId2Expanded = {}; // Store 'true' to allow persistent state for the expanded + // queries between updates. + this._mySqlThreaId2query = {}; // Store query text for each identifier. The dictionary gets + // updated at each refresh of the page. } fwk_app_on_show() { console.log('show: ' + this.fwk_app_name); @@ -43,6 +43,10 @@ function(CSSLoader, } } } + set_worker(worker) { + this._init(); + this._load(worker); + } _init() { if (this._initialized === undefined) this._initialized = false; if (this._initialized) return; @@ -51,11 +55,24 @@ function(CSSLoader,
+ +
+ + +
+
+ + +
${Common.html_update_ival('update-interval', 10)}
@@ -71,8 +88,31 @@ function(CSSLoader, + + + + + + + + + + + + + + + + + + + + + + + @@ -89,6 +129,7 @@ function(CSSLoader, }); cont.find("button#reset-controls-form").click(() => { this._set_update_interval_sec(10); + this._set_query_command('Query'); this._load(); }); } @@ -101,6 +142,9 @@ function(CSSLoader, } _update_interval_sec() { return this._form_control('select', 'update-interval').val(); } _set_update_interval_sec(val) { this._form_control('select', 'update-interval').val(val); } + _set_num_queries(total, displayed) { this._form_control('input', 'num-queries').val(displayed + ' / ' + total); } + _query_command() { return this._form_control('select', 'query-command').val(); } + _set_query_command(val) { this._form_control('select', 'query-command').val(val); } _worker() { return this._form_control('select', 'worker').val(); } _set_worker(val) { this._form_control('select', 'worker').val(val); } _set_workers(workers) { @@ -121,7 +165,7 @@ function(CSSLoader, } return this._table_obj; } - _load() { + _load(worker = undefined) { if (this._loading === undefined) this._loading = false; if (this._loading) return; this._loading = true; @@ -135,6 +179,7 @@ function(CSSLoader, workers.push(data.config.workers[i].name); } this._set_workers(workers); + if (!_.isUndefined(worker)) this._set_worker(worker); this._load_queries(); }, (msg) => { @@ -152,7 +197,7 @@ function(CSSLoader, }, (data) => { if (data.success) { - this._display(data.status.queries); + this._display(data.status); Fwk.setLastUpdate(this._table().children('caption')); } else { console.log('request failed', this.fwk_app_name, data.error); @@ -169,42 +214,99 @@ function(CSSLoader, } ); } - _display(queries) { + _display(status) { + const queryInspectTitle = "Click to see detailed info (progress, messages, etc.) on the query."; const queryCopyTitle = "Click to copy the query text to the clipboard."; const COL_Id = 0, COL_Command = 4, COL_Time = 5, COL_State = 6, COL_Info = 7; + const desiredQueryCommand = this._query_command(); let tbody = this._table().children('tbody'); - if (_.isEmpty(queries.columns)) { + if (_.isEmpty(status.queries.columns)) { tbody.html(''); return; } - this._id2query = {}; + this._mySqlThreaId2query = {}; + let numQueriesTotal = 0; + let numQueriesDisplayed = 0; let html = ''; - for (let i in queries.rows) { - let row = queries.rows[i]; - if (row[COL_Command] !== 'Query') continue; - let queryId = row[COL_Id]; + for (let i in status.queries.rows) { + numQueriesTotal++; + // MySQL query context + let row = status.queries.rows[i]; + const thisQueryCommand = row[COL_Command]; + if ((desiredQueryCommand !== '') && (thisQueryCommand !== desiredQueryCommand)) continue; + let mySqlThreadId = row[COL_Id]; let query = row[COL_Info]; - this._id2query[queryId] = query; - const expanded = (queryId in this._queryId2Expanded) && this._queryId2Expanded[queryId]; + this._mySqlThreaId2query[mySqlThreadId] = query; + const expanded = (mySqlThreadId in this._mySqlThreadId2Expanded) && this._mySqlThreadId2Expanded[mySqlThreadId]; const queryToggleTitle = "Click to toggle query formatting."; const queryStyle = "color:#4d4dff;"; + // Task context (if any) + let queryId = ''; + let jobId = ''; + let chunkId = ''; + let subChunkId = ''; + let templateId = ''; + let state = ''; + if (_.has(status.mysql_thread_to_task, mySqlThreadId)) { + let task = status.mysql_thread_to_task[mySqlThreadId]; + queryId = task['query_id']; + jobId = task['job_id']; + chunkId = task['chunk_id']; + subChunkId = task['subchunk_id']; + templateId = task['template_id']; + state = task['state']; + } + const rowClass = QservWorkerMySQLQueries._state2css(state); html += ` - - + + `; + if (queryId === '') { + html += ` + `; + } else { + html += ` + `; + } + html += ` + + + + + + - + + `; + if (query === '') { + html += ` + + `; + + } else { + html += ` - + `; + } + html += ` `; + numQueriesDisplayed++; } tbody.html(html); let that = this; + let displayQuery = function(e) { + let button = $(e.currentTarget); + let queryId = button.parent().parent().attr("query_id"); + Fwk.find("Status", "Query Inspector").set_query_id(queryId); + Fwk.show("Status", "Query Inspector"); + }; let copyQueryToClipboard = function(e) { let button = $(e.currentTarget); - let queryId = button.parent().parent().attr("id"); - let query = that._id2query[queryId]; + let mySqlThreadId = button.parent().parent().attr("mysql_thread_id"); + let query = that._mySqlThreaId2query[mySqlThreadId]; navigator.clipboard.writeText(query, () => {}, () => { alert("Failed to write the query to the clipboard. Please copy the text manually: " + query); } @@ -213,16 +315,29 @@ function(CSSLoader, let toggleQueryDisplay = function(e) { let td = $(e.currentTarget); let pre = td.find("pre.query"); - const queryId = td.parent().attr("id"); - const expanded = !((queryId in that._queryId2Expanded) && that._queryId2Expanded[queryId]); - pre.text(that._query2text(queryId, expanded)); - that._queryId2Expanded[queryId] = expanded; + const mySqlThreadId = td.parent().attr("mysql_thread_id"); + const expanded = !((mySqlThreadId in that._mySqlThreadId2Expanded) && that._mySqlThreadId2Expanded[mySqlThreadId]); + pre.text(that._query2text(mySqlThreadId, expanded)); + that._mySqlThreadId2Expanded[mySqlThreadId] = expanded; }; + tbody.find("button.inspect-query").click(displayQuery); tbody.find("button.copy-query").click(copyQueryToClipboard); tbody.find("td.query_toggler").click(toggleQueryDisplay); + this._set_num_queries(numQueriesTotal, numQueriesDisplayed); } - _query2text(queryId, expanded) { - return Common.query2text(this._id2query[queryId], expanded); + _query2text(mySqlThreadId, expanded) { + return Common.query2text(this._mySqlThreaId2query[mySqlThreadId], expanded); + } + static _state2css(state) { + switch (state) { + case 'CREATED': return 'table-warning'; + case 'QUEUED': return 'table-light'; + case 'STARTED': return 'table-danger'; + case 'EXECUTING_QUERY': return 'table-primary'; + case 'READING_DATA': return 'table-info'; + case 'FINISHED': return 'table-secondary'; + default: return ''; + } } } return QservWorkerMySQLQueries; diff --git a/src/xrdsvc/SsiRequest.cc b/src/xrdsvc/SsiRequest.cc index 91136f74d..e17289256 100644 --- a/src/xrdsvc/SsiRequest.cc +++ b/src/xrdsvc/SsiRequest.cc @@ -395,7 +395,8 @@ wbase::WorkerCommand::Ptr SsiRequest::parseWorkerCommand( break; } case proto::WorkerCommandH::GET_DATABASE_STATUS: { - command = std::make_shared(sendChannel); + command = std::make_shared(sendChannel, + _foreman->queriesAndChunks()); break; } case proto::WorkerCommandH::GET_CONFIG: {
Task      MySQL     
QIDjobchunksubchunktemplstate Id TimeCommand State Query
${queryId}
${queryId}
  + +
${jobId}
${chunkId}
${subChunkId}
${templateId}
${state}
${mySqlThreadId}
${row[COL_Time]}
${row[COL_State]}
${row[COL_Command]}
${row[COL_State]}
  
` + this._query2text(queryId, expanded) + `
` + this._query2text(mySqlThreadId, expanded) + `