diff --git a/core/modules/ccontrol/UserQuerySelect.cc b/core/modules/ccontrol/UserQuerySelect.cc index 827364e05f..2fad13d5cf 100644 --- a/core/modules/ccontrol/UserQuerySelect.cc +++ b/core/modules/ccontrol/UserQuerySelect.cc @@ -368,31 +368,24 @@ void UserQuerySelect::submit() { if (!uberJobsEnabled) { std::function funcBuildJob = - [this, sequence, job{move(job)}](util::CmdData*) { // references in captures cause races + //&&&[this, sequence, job{move(job)}](util::CmdData*) { // references in captures cause races + [this, job{move(job)}](util::CmdData*) { // references in captures cause races QSERV_LOGCONTEXT_QUERY(_qMetaQueryId); job->runJob(); }; auto cmd = std::make_shared(funcBuildJob); _executive->queueJobStart(cmd); - } ++sequence; } if (uberJobsEnabled) { vector uberJobs; - /* &&& - vector workers; // &&& delete and replace with a real list of workers - throw Bug("&&&NEED_CODE to find all workers"); // workers = all workers found in database - for (auto&& worker:workers) { - worker.fillChunkIdSet(); - } - */ czar::WorkerResources workerResources; - workerResources.setMonoNodeTest(); //&&& TODO:UJ only good for mono-node test. + workerResources.setMonoNodeTest(); //&&& TODO:UJ only good for mono-node test. Need a real list of workers and their chunks. ****** - // &&& make a map of all jobs in the executive. + // Make a map of all jobs in the executive. // &&& TODO:UJ for now, just using ints. At some point, need to check that ResourceUnit databases can be found for all databases in the query qdisp::Executive::ChunkIdJobMapType chunksInQuery = _executive->getChunkJobMapAndInvalidate(); @@ -405,13 +398,6 @@ void UserQuerySelect::submit() { /// make a map that will be destroyed as chunks are checked/used map> tmpWorkerList = workerResources.getDequesFor(dbName); - /* &&& - list> tmpWorkerList; - for(auto&& worker:workers) { - tmpWorkerList.push_back(worker); - } - */ - // TODO:UJ So UberJobIds don't conflict with chunk numbers or jobIds, start at a large number. // This could use some refinement. int uberJobId = qdisp::UberJob::getFirstIdNumber(); @@ -461,26 +447,49 @@ void UserQuerySelect::submit() { workerIter = tmpWorkerList.begin(); } } + LOGS(_log, LOG_LVL_INFO, "&&& submit m"); _executive->addUberJobs(uberJobs); + LOGS(_log, LOG_LVL_INFO, "&&& submit n"); for (auto&& uJob:uberJobs) { + LOGS(_log, LOG_LVL_INFO, "&&& submit o"); uJob->runUberJob(); + LOGS(_log, LOG_LVL_INFO, "&&& submit p"); } - _executive->startRemainingJobs(); + LOGS(_log, LOG_LVL_INFO, "&&& submit q"); + // If any chunks in the query were not found on a worker's list, run them individually. + //&&&_executive->startRemainingJobs(chunksInQuery); //&&& delete func in Executive. + for (auto& ciq:chunksInQuery) { + qdisp::JobQuery* jqRaw = ciq.second; + qdisp::JobQuery::Ptr job = _executive->getSharedPtrForRawJobPtr(jqRaw); + std::function funcBuildJob = + [this, job{move(job)}](util::CmdData*) { // references in captures cause races + QSERV_LOGCONTEXT_QUERY(_qMetaQueryId); + job->runJob(); + }; + auto cmd = std::make_shared(funcBuildJob); + _executive->queueJobStart(cmd); + } + + LOGS(_log, LOG_LVL_INFO, "&&& submit r"); } // attempt to restore original thread priority, requires root if (increaseThreadPriority) { threadPriority.restoreOriginalValues(); } + LOGS(_log, LOG_LVL_INFO, "&&& submit s"); LOGS(_log, LOG_LVL_DEBUG, "total jobs in query=" << sequence); _executive->waitForAllJobsToStart(); + LOGS(_log, LOG_LVL_INFO, "&&& submit t"); // we only care about per-chunk info for ASYNC queries if (_async) { + LOGS(_log, LOG_LVL_INFO, "&&& submit u"); std::lock_guard lock(chunksMtx); _qMetaAddChunks(chunks); } + LOGS(_log, LOG_LVL_INFO, "&&& submit v"); } diff --git a/core/modules/qdisp/Executive.cc b/core/modules/qdisp/Executive.cc index 4429eab878..9e18752320 100644 --- a/core/modules/qdisp/Executive.cc +++ b/core/modules/qdisp/Executive.cc @@ -246,6 +246,20 @@ bool Executive::_addJobToMap(JobQuery::Ptr const& job) { return res; } + +JobQuery::Ptr Executive::getSharedPtrForRawJobPtr(JobQuery* jqRaw) { + assert(jqRaw != nullptr); + int jobId = jqRaw->getIdInt(); + lock_guard lockJobMap(_jobMapMtx); + auto iter = _jobMap.find(jobId); + if (iter == _jobMap.end()) { + throw Bug("Could not find the entry for jobId=" + to_string(jobId)); + } + JobQuery::Ptr jq = iter->second; + return jq; +} + + bool Executive::join() { // To join, we make sure that all of the chunks added so far are complete. // Check to see if _requesters is empty, if not, then sleep on a condition. @@ -611,7 +625,7 @@ bool Executive::startUberJob(UberJob::Ptr const& uJob) { } -void Executive::startRemainingJobs() { +void Executive::startRemainingJobs(ChunkIdJobMapType& remainingChunks) { throw Bug("&&&NEED_CODE executive start remaining jobs"); } diff --git a/core/modules/qdisp/Executive.h b/core/modules/qdisp/Executive.h index b7d6d3177d..4dcbb90154 100644 --- a/core/modules/qdisp/Executive.h +++ b/core/modules/qdisp/Executive.h @@ -141,12 +141,13 @@ class Executive : public std::enable_shared_from_this { bool startQuery(std::shared_ptr const& jobQuery); /// Start any jobs that were not started as part of UberJobs. - void startRemainingJobs(); + void startRemainingJobs(ChunkIdJobMapType& remainingJobs); // &&& delete ///&&& TODO:UJ UberJob void addUberJobs(std::vector> const& jobsToAdd); ChunkIdJobMapType& getChunkJobMapAndInvalidate(); bool startUberJob(std::shared_ptr const& uJob); + std::shared_ptr getSharedPtrForRawJobPtr(JobQuery* jqRaw); private: Executive(ExecutiveConfig const& c, std::shared_ptr const& ms, diff --git a/core/modules/qdisp/UberJob.cc b/core/modules/qdisp/UberJob.cc index 638f9b48c4..67b6b5d1e3 100644 --- a/core/modules/qdisp/UberJob.cc +++ b/core/modules/qdisp/UberJob.cc @@ -60,6 +60,7 @@ UberJob::UberJob(Executive::Ptr const& executive, : JobBase(), _executive(executive), _respHandler(respHandler), _queryId(queryId), _uberJobId(uberJobId), _czarId(czarId), _idStr("QID=" + to_string(_queryId) + ":uber=" + to_string(uberJobId)) { _qdispPool = executive->getQdispPool(); + _jobStatus = make_shared(); } @@ -77,6 +78,7 @@ bool UberJob::addJob(JobQuery* job) { bool UberJob::runUberJob() { QSERV_LOGCONTEXT_QUERY_JOB(getQueryId(), getIdInt()); + LOGS(_log, LOG_LVL_INFO, "&&& runUberJob a"); // Build the uberjob payload. // TODO:UJ For simplicity in the first pass, just make a TaskMsg for each Job and append it to the UberJobMsg. // This is terribly inefficient and should be replaced by using a template and list of chunks that the @@ -86,22 +88,32 @@ bool UberJob::runUberJob() { proto::UberJobMsg* ujMsg = google::protobuf::Arena::CreateMessage(&arena); ujMsg->set_queryid(getQueryId()); ujMsg->set_czarid(_czarId); + LOGS(_log, LOG_LVL_INFO, "&&& runUberJob b"); for (auto&& job:_jobs) { + LOGS(_log, LOG_LVL_INFO, "&&& runUberJob b1"); proto::TaskMsg* tMsg = ujMsg->add_taskmsgs(); + LOGS(_log, LOG_LVL_INFO, "&&& runUberJob b2"); job->getDescription()->fillTaskMsg(tMsg); + LOGS(_log, LOG_LVL_INFO, "&&& runUberJob b3"); } + LOGS(_log, LOG_LVL_INFO, "&&& runUberJob c"); ujMsg->SerializeToString(&_payload); + LOGS(_log, LOG_LVL_INFO, "&&& runUberJob d"); } + LOGS(_log, LOG_LVL_INFO, "&&& runUberJob e"); auto executive = _executive.lock(); if (executive == nullptr) { LOGS(_log, LOG_LVL_ERROR, "runUberJob failed executive==nullptr"); return false; } + LOGS(_log, LOG_LVL_INFO, "&&& runUberJob f"); bool cancelled = executive->getCancelled(); bool handlerReset = _respHandler->reset(); bool started = _started.exchange(true); + LOGS(_log, LOG_LVL_INFO, "&&& runUberJob g"); if (!cancelled && handlerReset && !started) { + LOGS(_log, LOG_LVL_INFO, "&&& runUberJob h"); auto criticalErr = [this, &executive](std::string const& msg) { LOGS(_log, LOG_LVL_ERROR, msg << " " << *this << " Canceling user query!"); @@ -114,16 +126,20 @@ bool UberJob::runUberJob() { return false; } + LOGS(_log, LOG_LVL_INFO, "&&& runUberJob i"); // At this point we are all set to actually run the queries. We create a // a shared pointer to this object to prevent it from escaping while we // are trying to start this whole process. We also make sure we record // whether or not we are in SSI as cancellation handling differs. // - LOGS(_log, LOG_LVL_TRACE, "runJob calls StartQuery()"); + LOGS(_log, LOG_LVL_TRACE, "runUberJob calls StartQuery()"); std::shared_ptr uJob(dynamic_pointer_cast(shared_from_this())); + LOGS(_log, LOG_LVL_INFO, "&&& runUberJob j"); _inSsi = true; if (executive->startUberJob(uJob)) { + LOGS(_log, LOG_LVL_INFO, "&&& runUberJob k"); _jobStatus->updateInfo(_idStr, JobStatus::REQUEST); + LOGS(_log, LOG_LVL_INFO, "&&& runUberJob l"); return true; } _inSsi = false; diff --git a/core/modules/qproc/TaskMsgFactory.cc b/core/modules/qproc/TaskMsgFactory.cc index f2d491cf93..c85cb28908 100644 --- a/core/modules/qproc/TaskMsgFactory.cc +++ b/core/modules/qproc/TaskMsgFactory.cc @@ -114,6 +114,7 @@ bool TaskMsgFactory::fillTaskMsg(proto::TaskMsg* taskMsg, ChunkQuerySpec const& _addFragment(*taskMsg, resultTable, chunkQuerySpec.subChunkTables, chunkQuerySpec.subChunkIds, chunkQuerySpec.queries); } + LOGS(_log, LOG_LVL_WARN, "&&& _makeMsg end chunkId=" << chunkQuerySpec.chunkId); return true; }