diff --git a/src/replica/apps/WorkerApp.cc b/src/replica/apps/WorkerApp.cc index ca5d6c165..c4ce2dacb 100644 --- a/src/replica/apps/WorkerApp.cc +++ b/src/replica/apps/WorkerApp.cc @@ -40,7 +40,6 @@ #include "replica/util/FileUtils.h" #include "replica/worker/FileServer.h" #include "replica/worker/WorkerProcessor.h" -#include "replica/worker/WorkerRequestFactory.h" #include "replica/worker/WorkerServer.h" // LSST headers @@ -111,13 +110,7 @@ int WorkerApp::runImpl() { _verifyCreateFolders(); - // Configure the factory with a pool of persistent connectors - auto const config = serviceProvider()->config(); - auto const connectionPool = ConnectionPool::create(Configuration::qservWorkerDbParams(), - config->get("database", "services-pool-size")); - WorkerRequestFactory requestFactory(serviceProvider(), connectionPool); - - auto const reqProcSvr = WorkerServer::create(serviceProvider(), requestFactory, worker); + auto const reqProcSvr = WorkerServer::create(serviceProvider(), worker); thread reqProcSvrThread([reqProcSvr]() { reqProcSvr->run(); }); auto const fileSvr = FileServer::create(serviceProvider(), worker); @@ -147,8 +140,8 @@ int WorkerApp::runImpl() { << " new, in-progress, finished: " << reqProcSvr->processor()->numNewRequests() << ", " << reqProcSvr->processor()->numInProgressRequests() << ", " << reqProcSvr->processor()->numFinishedRequests()); - this_thread::sleep_for( - chrono::seconds(max(1U, config->get("registry", "heartbeat-ival-sec")))); + this_thread::sleep_for(chrono::seconds( + max(1U, serviceProvider()->config()->get("registry", "heartbeat-ival-sec")))); } reqProcSvrThread.join(); fileSvrThread.join(); diff --git a/src/replica/worker/CMakeLists.txt b/src/replica/worker/CMakeLists.txt index f45a1a309..a37868d82 100644 --- a/src/replica/worker/CMakeLists.txt +++ b/src/replica/worker/CMakeLists.txt @@ -13,7 +13,6 @@ target_sources(replica_worker PRIVATE WorkerProcessorThread.cc WorkerReplicationRequest.cc WorkerRequest.cc - WorkerRequestFactory.cc WorkerServer.cc WorkerServerConnection.cc WorkerSqlRequest.cc diff --git a/src/replica/worker/WorkerDeleteRequest.cc b/src/replica/worker/WorkerDeleteRequest.cc index 102883d8c..3b1e07008 100644 --- a/src/replica/worker/WorkerDeleteRequest.cc +++ b/src/replica/worker/WorkerDeleteRequest.cc @@ -47,17 +47,15 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.WorkerDeleteRequest"); namespace lsst::qserv::replica { -////////////////////////////////////////////////////////////// -///////////////////// WorkerDeleteRequest //////////////////// -////////////////////////////////////////////////////////////// - WorkerDeleteRequest::Ptr WorkerDeleteRequest::create(ServiceProvider::Ptr const& serviceProvider, string const& worker, string const& id, int priority, ExpirationCallbackType const& onExpired, unsigned int requestExpirationIvalSec, ProtocolRequestDelete const& request) { - return WorkerDeleteRequest::Ptr(new WorkerDeleteRequest(serviceProvider, worker, id, priority, onExpired, - requestExpirationIvalSec, request)); + auto ptr = WorkerDeleteRequest::Ptr(new WorkerDeleteRequest( + serviceProvider, worker, id, priority, onExpired, requestExpirationIvalSec, request)); + ptr->init(); + return ptr; } WorkerDeleteRequest::WorkerDeleteRequest(ServiceProvider::Ptr const& serviceProvider, string const& worker, @@ -85,35 +83,8 @@ void WorkerDeleteRequest::setInfo(ProtocolResponseDelete& response) const { bool WorkerDeleteRequest::execute() { LOGS(_log, LOG_LVL_DEBUG, context(__func__) << " db: " << database() << " chunk: " << chunk()); - return WorkerRequest::execute(); -} - -/////////////////////////////////////////////////////////////////// -///////////////////// WorkerDeleteRequestPOSIX //////////////////// -/////////////////////////////////////////////////////////////////// - -WorkerDeleteRequestPOSIX::Ptr WorkerDeleteRequestPOSIX::create(ServiceProvider::Ptr const& serviceProvider, - string const& worker, string const& id, - int priority, - ExpirationCallbackType const& onExpired, - unsigned int requestExpirationIvalSec, - ProtocolRequestDelete const& request) { - return WorkerDeleteRequestPOSIX::Ptr(new WorkerDeleteRequestPOSIX( - serviceProvider, worker, id, priority, onExpired, requestExpirationIvalSec, request)); -} - -WorkerDeleteRequestPOSIX::WorkerDeleteRequestPOSIX(ServiceProvider::Ptr const& serviceProvider, - string const& worker, string const& id, int priority, - ExpirationCallbackType const& onExpired, - unsigned int requestExpirationIvalSec, - ProtocolRequestDelete const& request) - : WorkerDeleteRequest(serviceProvider, worker, id, priority, onExpired, requestExpirationIvalSec, - request) {} - -bool WorkerDeleteRequestPOSIX::execute() { - LOGS(_log, LOG_LVL_DEBUG, context(__func__) << " db: " << database() << " chunk: " << chunk()); - replica::Lock lock(_mtx, context(__func__)); + checkIfCancelling(lock, __func__); auto const config = _serviceProvider->config(); DatabaseInfo const databaseInfo = config->databaseInfo(database()); diff --git a/src/replica/worker/WorkerDeleteRequest.h b/src/replica/worker/WorkerDeleteRequest.h index 42516dcd3..766c2ab56 100644 --- a/src/replica/worker/WorkerDeleteRequest.h +++ b/src/replica/worker/WorkerDeleteRequest.h @@ -35,10 +35,7 @@ namespace lsst::qserv::replica { /** * Class WorkerDeleteRequest represents a context and a state of replica deletion - * requests within the worker servers. It can also be used for testing the framework - * operation as its implementation won't make any changes to any files or databases. - * - * Real implementations of the request processing must derive from this class. + * requests within the worker servers. */ class WorkerDeleteRequest : public WorkerRequest { public: @@ -74,10 +71,7 @@ class WorkerDeleteRequest : public WorkerRequest { ~WorkerDeleteRequest() override = default; - // Trivial get methods - std::string const& database() const { return _request.database(); } - unsigned int chunk() const { return _request.chunk(); } /** @@ -94,47 +88,12 @@ class WorkerDeleteRequest : public WorkerRequest { unsigned int requestExpirationIvalSec, ProtocolRequestDelete const& request); // Input parameters - ProtocolRequestDelete const _request; /// Extended status of the replica deletion request ReplicaInfo _replicaInfo; }; -/** - * Class WorkerDeleteRequestPOSIX provides an actual implementation for - * the replica deletion based on the direct manipulation of files on - * a POSIX file system. - */ -class WorkerDeleteRequestPOSIX : public WorkerDeleteRequest { -public: - typedef std::shared_ptr Ptr; - - static Ptr create(ServiceProvider::Ptr const& serviceProvider, std::string const& worker, - std::string const& id, int priority, ExpirationCallbackType const& onExpired, - unsigned int requestExpirationIvalSec, ProtocolRequestDelete const& request); - - WorkerDeleteRequestPOSIX() = delete; - WorkerDeleteRequestPOSIX(WorkerDeleteRequestPOSIX const&) = delete; - WorkerDeleteRequestPOSIX& operator=(WorkerDeleteRequestPOSIX const&) = delete; - - ~WorkerDeleteRequestPOSIX() final = default; - - bool execute() final; - -private: - WorkerDeleteRequestPOSIX(ServiceProvider::Ptr const& serviceProvider, std::string const& worker, - std::string const& id, int priority, ExpirationCallbackType const& onExpired, - unsigned int requestExpirationIvalSec, ProtocolRequestDelete const& request); -}; - -/** - * Class WorkerDeleteRequestFS has the same implementation as the 'typedef'-ed - * class for the replica deletion based on the direct manipulation of files on - * a POSIX file system. - */ -typedef WorkerDeleteRequestPOSIX WorkerDeleteRequestFS; - } // namespace lsst::qserv::replica #endif // LSST_QSERV_REPLICA_WORKERDELETEREQUEST_H diff --git a/src/replica/worker/WorkerDirectorIndexRequest.cc b/src/replica/worker/WorkerDirectorIndexRequest.cc index b6dce4543..203aa3510 100644 --- a/src/replica/worker/WorkerDirectorIndexRequest.cc +++ b/src/replica/worker/WorkerDirectorIndexRequest.cc @@ -58,9 +58,11 @@ WorkerDirectorIndexRequest::Ptr WorkerDirectorIndexRequest::create( ServiceProvider::Ptr const& serviceProvider, ConnectionPoolPtr const& connectionPool, string const& worker, string const& id, int priority, ExpirationCallbackType const& onExpired, unsigned int requestExpirationIvalSec, ProtocolRequestDirectorIndex const& request) { - return WorkerDirectorIndexRequest::Ptr(new WorkerDirectorIndexRequest(serviceProvider, connectionPool, - worker, id, priority, onExpired, - requestExpirationIvalSec, request)); + auto ptr = WorkerDirectorIndexRequest::Ptr( + new WorkerDirectorIndexRequest(serviceProvider, connectionPool, worker, id, priority, onExpired, + requestExpirationIvalSec, request)); + ptr->init(); + return ptr; } WorkerDirectorIndexRequest::WorkerDirectorIndexRequest(ServiceProvider::Ptr const& serviceProvider, @@ -80,14 +82,11 @@ WorkerDirectorIndexRequest::WorkerDirectorIndexRequest(ServiceProvider::Ptr cons void WorkerDirectorIndexRequest::setInfo(ProtocolResponseDirectorIndex& response) const { LOGS(_log, LOG_LVL_DEBUG, context(__func__)); - replica::Lock lock(_mtx, context(__func__)); - response.set_allocated_target_performance(performance().info().release()); response.set_error(_error); response.set_data(_data); response.set_total_bytes(_fileSizeBytes); - *(response.mutable_request()) = _request; } @@ -95,17 +94,8 @@ bool WorkerDirectorIndexRequest::execute() { LOGS(_log, LOG_LVL_DEBUG, context(__func__)); replica::Lock lock(_mtx, context(__func__)); + checkIfCancelling(lock, __func__); - switch (status()) { - case ProtocolStatus::IN_PROGRESS: - break; - case ProtocolStatus::IS_CANCELLING: - setStatus(lock, ProtocolStatus::CANCELLED); - throw WorkerRequestCancelled(); - default: - throw logic_error("WorkerDirectorIndexRequest::" + context(__func__) + - " not allowed while in state: " + WorkerRequest::status2string(status())); - } try { // The table will be scanned only when the offset is set to 0. if (_request.offset() == 0) { diff --git a/src/replica/worker/WorkerDirectorIndexRequest.h b/src/replica/worker/WorkerDirectorIndexRequest.h index 6910ab0ea..8c75d5fa7 100644 --- a/src/replica/worker/WorkerDirectorIndexRequest.h +++ b/src/replica/worker/WorkerDirectorIndexRequest.h @@ -86,7 +86,6 @@ class WorkerDirectorIndexRequest : public WorkerRequest { /** * Extract request status into the Protobuf response object. - * * @param response Protobuf response to be initialized */ void setInfo(ProtocolResponseDirectorIndex& response) const; @@ -147,12 +146,6 @@ class WorkerDirectorIndexRequest : public WorkerRequest { std::string _data; }; -/// Class WorkerDirectorIndexRequest provides an actual implementation -typedef WorkerDirectorIndexRequest WorkerDirectorIndexRequestFS; - -/// Class WorkerDirectorIndexRequest provides an actual implementation -typedef WorkerDirectorIndexRequest WorkerDirectorIndexRequestPOSIX; - } // namespace lsst::qserv::replica #endif // LSST_QSERV_REPLICA_WORKERDIRECTORINDEXREQUEST_H diff --git a/src/replica/worker/WorkerEchoRequest.cc b/src/replica/worker/WorkerEchoRequest.cc index 4a8462292..0d36e2eeb 100644 --- a/src/replica/worker/WorkerEchoRequest.cc +++ b/src/replica/worker/WorkerEchoRequest.cc @@ -47,8 +47,10 @@ WorkerEchoRequest::Ptr WorkerEchoRequest::create(ServiceProvider::Ptr const& ser ExpirationCallbackType const& onExpired, unsigned int requestExpirationIvalSec, ProtocolRequestEcho const& request) { - return WorkerEchoRequest::Ptr(new WorkerEchoRequest(serviceProvider, worker, id, priority, onExpired, - requestExpirationIvalSec, request)); + auto ptr = WorkerEchoRequest::Ptr(new WorkerEchoRequest(serviceProvider, worker, id, priority, onExpired, + requestExpirationIvalSec, request)); + ptr->init(); + return ptr; } WorkerEchoRequest::WorkerEchoRequest(ServiceProvider::Ptr const& serviceProvider, string const& worker, @@ -64,10 +66,8 @@ void WorkerEchoRequest::setInfo(ProtocolResponseEcho& response) const { LOGS(_log, LOG_LVL_DEBUG, context(__func__)); replica::Lock lock(_mtx, context(__func__)); - response.set_allocated_target_performance(performance().info().release()); response.set_data(data()); - *(response.mutable_request()) = _request; } @@ -75,28 +75,11 @@ bool WorkerEchoRequest::execute() { LOGS(_log, LOG_LVL_DEBUG, context(__func__) << " delay:" << delay() << " _delayLeft:" << _delayLeft); replica::Lock lock(_mtx, context(__func__)); - - switch (status()) { - case ProtocolStatus::IN_PROGRESS: - break; - - case ProtocolStatus::IS_CANCELLING: - - // Abort the operation right away - - setStatus(lock, ProtocolStatus::CANCELLED); - throw WorkerRequestCancelled(); - - default: - throw logic_error(context(__func__) + - " not allowed while in state: " + WorkerRequest::status2string(status())); - } + checkIfCancelling(lock, __func__); // Block the thread for the random number of milliseconds in the interval // below. Then update the amount of time which is still left. - util::BlockPost blockPost(1000, 2000); - uint64_t const span = blockPost.wait(); _delayLeft -= (span < _delayLeft) ? span : _delayLeft; diff --git a/src/replica/worker/WorkerEchoRequest.h b/src/replica/worker/WorkerEchoRequest.h index 5d47fa716..29822cd1a 100644 --- a/src/replica/worker/WorkerEchoRequest.h +++ b/src/replica/worker/WorkerEchoRequest.h @@ -71,10 +71,7 @@ class WorkerEchoRequest : public WorkerRequest { ~WorkerEchoRequest() override = default; - // Trivial get methods - std::string const& data() const { return _request.data(); } - uint64_t delay() const { return _request.delay(); } /** @@ -91,19 +88,12 @@ class WorkerEchoRequest : public WorkerRequest { unsigned int requestExpirationIvalSec, ProtocolRequestEcho const& request); // Input parameters - ProtocolRequestEcho const _request; /// The amount of the initial delay which is still left uint64_t _delayLeft; }; -/// Class WorkerEchoRequest provides an actual implementation -typedef WorkerEchoRequest WorkerEchoRequestFS; - -/// Class WorkerEchoRequest provides an actual implementation -typedef WorkerEchoRequest WorkerEchoRequestPOSIX; - } // namespace lsst::qserv::replica #endif // LSST_QSERV_REPLICA_WORKERECHOREQUEST_H diff --git a/src/replica/worker/WorkerFindAllRequest.cc b/src/replica/worker/WorkerFindAllRequest.cc index 38f5b1367..b063bf683 100644 --- a/src/replica/worker/WorkerFindAllRequest.cc +++ b/src/replica/worker/WorkerFindAllRequest.cc @@ -48,17 +48,15 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.WorkerFindAllRequest"); namespace lsst::qserv::replica { -/////////////////////////////////////////////////////////////// -///////////////////// WorkerFindAllRequest //////////////////// -/////////////////////////////////////////////////////////////// - WorkerFindAllRequest::Ptr WorkerFindAllRequest::create(ServiceProvider::Ptr const& serviceProvider, string const& worker, string const& id, int priority, ExpirationCallbackType const& onExpired, unsigned int requestExpirationIvalSec, ProtocolRequestFindAll const& request) { - return WorkerFindAllRequest::Ptr(new WorkerFindAllRequest(serviceProvider, worker, id, priority, - onExpired, requestExpirationIvalSec, request)); + auto ptr = WorkerFindAllRequest::Ptr(new WorkerFindAllRequest( + serviceProvider, worker, id, priority, onExpired, requestExpirationIvalSec, request)); + ptr->init(); + return ptr; } WorkerFindAllRequest::WorkerFindAllRequest(ServiceProvider::Ptr const& serviceProvider, string const& worker, @@ -72,11 +70,8 @@ WorkerFindAllRequest::WorkerFindAllRequest(ServiceProvider::Ptr const& servicePr void WorkerFindAllRequest::setInfo(ProtocolResponseFindAll& response) const { LOGS(_log, LOG_LVL_DEBUG, context(__func__)); - replica::Lock lock(_mtx, context(__func__)); - response.set_allocated_target_performance(performance().info().release()); - for (auto&& replicaInfo : _replicaInfoCollection) { replicaInfo.setInfo(response.add_replica_info_many()); } @@ -86,61 +81,20 @@ void WorkerFindAllRequest::setInfo(ProtocolResponseFindAll& response) const { bool WorkerFindAllRequest::execute() { LOGS(_log, LOG_LVL_DEBUG, context(__func__) << " database: " << database()); - // Set up the result if the operation is over - - bool completed = WorkerRequest::execute(); - if (completed) { - // Simulate the request processing by making an arbitrary number of - // datasets. - - for (unsigned int chunk = 0; chunk < 8; ++chunk) { - _replicaInfoCollection.emplace_back(ReplicaInfo::COMPLETE, _worker, database(), chunk, - util::TimeUtils::now(), ReplicaInfo::FileInfoCollection()); - } - } - return completed; -} - -//////////////////////////////////////////////////////////////////// -///////////////////// WorkerFindAllRequestPOSIX //////////////////// -//////////////////////////////////////////////////////////////////// - -WorkerFindAllRequestPOSIX::Ptr WorkerFindAllRequestPOSIX::create(ServiceProvider::Ptr const& serviceProvider, - string const& worker, string const& id, - int priority, - ExpirationCallbackType const& onExpired, - unsigned int requestExpirationIvalSec, - ProtocolRequestFindAll const& request) { - return WorkerFindAllRequestPOSIX::Ptr(new WorkerFindAllRequestPOSIX( - serviceProvider, worker, id, priority, onExpired, requestExpirationIvalSec, request)); -} - -WorkerFindAllRequestPOSIX::WorkerFindAllRequestPOSIX(ServiceProvider::Ptr const& serviceProvider, - string const& worker, string const& id, int priority, - ExpirationCallbackType const& onExpired, - unsigned int requestExpirationIvalSec, - ProtocolRequestFindAll const& request) - : WorkerFindAllRequest(serviceProvider, worker, id, priority, onExpired, requestExpirationIvalSec, - request) {} - -bool WorkerFindAllRequestPOSIX::execute() { - LOGS(_log, LOG_LVL_DEBUG, context(__func__) << " database: " << database()); - replica::Lock lock(_mtx, context(__func__)); + checkIfCancelling(lock, __func__); auto const config = _serviceProvider->config(); DatabaseInfo const databaseInfo = config->databaseInfo(database()); // Scan the data directory to find all files which match the expected pattern(s) // and group them by their chunk number - WorkerRequest::ErrorContext errorContext; boost::system::error_code ec; map chunk2fileInfoCollection; { replica::Lock dataFolderLock(_mtxDataFolderOperations, context(__func__)); - fs::path const dataDir = fs::path(config->get("worker", "data-dir")) / database(); fs::file_status const stat = fs::status(dataDir, ec); errorContext = errorContext or @@ -168,7 +122,6 @@ bool WorkerFindAllRequestPOSIX::execute() { "failed to read file mtime: " + entry.path().string()); unsigned const chunk = get<1>(parsed); - chunk2fileInfoCollection[chunk].emplace_back(ReplicaInfo::FileInfo({ entry.path().filename().string(), size, mtime, "", /* cs is never computed for this type of requests */ @@ -191,9 +144,7 @@ bool WorkerFindAllRequestPOSIX::execute() { // Analyze results to see which chunks are complete using chunk 0 as an example // of the total number of files which are normally associated with each chunk. - size_t const numFilesPerChunkRequired = FileUtils::partitionedFiles(databaseInfo, 0).size(); - for (auto&& entry : chunk2fileInfoCollection) { unsigned int const chunk = entry.first; size_t const numFiles = entry.second.size(); @@ -201,7 +152,6 @@ bool WorkerFindAllRequestPOSIX::execute() { numFiles < numFilesPerChunkRequired ? ReplicaInfo::INCOMPLETE : ReplicaInfo::COMPLETE, worker(), database(), chunk, util::TimeUtils::now(), chunk2fileInfoCollection[chunk]); } - setStatus(lock, ProtocolStatus::SUCCESS); return true; } diff --git a/src/replica/worker/WorkerFindAllRequest.h b/src/replica/worker/WorkerFindAllRequest.h index 5622bbb50..97388e9ae 100644 --- a/src/replica/worker/WorkerFindAllRequest.h +++ b/src/replica/worker/WorkerFindAllRequest.h @@ -34,10 +34,7 @@ namespace lsst::qserv::replica { /** * Class WorkerFindAllRequest represents a context and a state of replicas lookup - * requests within the worker servers. It can also be used for testing the framework - * operation as its implementation won't make any changes to any files or databases. - * - * Real implementations of the request processing must derive from this class. + * requests within the worker servers. */ class WorkerFindAllRequest : public WorkerRequest { public: @@ -73,8 +70,6 @@ class WorkerFindAllRequest : public WorkerRequest { ~WorkerFindAllRequest() override = default; - // Trivial get methods - std::string const& database() const { return _request.database(); } /** @@ -91,48 +86,12 @@ class WorkerFindAllRequest : public WorkerRequest { unsigned int requestExpirationIvalSec, ProtocolRequestFindAll const& request); // Input parameters - ProtocolRequestFindAll const _request; /// Result of the operation ReplicaInfoCollection _replicaInfoCollection; }; -/** - * Class WorkerFindAllRequestPOSIX provides an actual implementation for - * the replicas lookup based on the direct manipulation of files on - * a POSIX file system. - */ -class WorkerFindAllRequestPOSIX : public WorkerFindAllRequest { -public: - typedef std::shared_ptr Ptr; - - /// @see WorkerFindAllRequest::create() - static Ptr create(ServiceProvider::Ptr const& serviceProvider, std::string const& worker, - std::string const& id, int priority, ExpirationCallbackType const& onExpired, - unsigned int requestExpirationIvalSec, ProtocolRequestFindAll const& request); - - WorkerFindAllRequestPOSIX() = delete; - WorkerFindAllRequestPOSIX(WorkerFindAllRequestPOSIX const&) = delete; - WorkerFindAllRequestPOSIX& operator=(WorkerFindAllRequestPOSIX const&) = delete; - - ~WorkerFindAllRequestPOSIX() final = default; - - bool execute() final; - -private: - WorkerFindAllRequestPOSIX(ServiceProvider::Ptr const& serviceProvider, std::string const& worker, - std::string const& id, int priority, ExpirationCallbackType const& onExpired, - unsigned int requestExpirationIvalSec, ProtocolRequestFindAll const& request); -}; - -/** - * Class WorkerFindAllRequestFS has the same implementation as the 'typedef'-ed - * class for the replica deletion based on the direct manipulation of files on - * a POSIX file system. - */ -typedef WorkerFindAllRequestPOSIX WorkerFindAllRequestFS; - } // namespace lsst::qserv::replica #endif // LSST_QSERV_REPLICA_WORKERFINDALLREQUEST_H diff --git a/src/replica/worker/WorkerFindRequest.cc b/src/replica/worker/WorkerFindRequest.cc index 76a9abcb1..6d2d9fc69 100644 --- a/src/replica/worker/WorkerFindRequest.cc +++ b/src/replica/worker/WorkerFindRequest.cc @@ -47,17 +47,15 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.WorkerFindRequest"); namespace lsst::qserv::replica { -//////////////////////////////////////////////////////////// -///////////////////// WorkerFindRequest //////////////////// -//////////////////////////////////////////////////////////// - WorkerFindRequest::Ptr WorkerFindRequest::create(ServiceProvider::Ptr const& serviceProvider, string const& worker, string const& id, int priority, ExpirationCallbackType const& onExpired, unsigned int requestExpirationIvalSec, ProtocolRequestFind const& request) { - return WorkerFindRequest::Ptr(new WorkerFindRequest(serviceProvider, worker, id, priority, onExpired, - requestExpirationIvalSec, request)); + auto ptr = WorkerFindRequest::Ptr(new WorkerFindRequest(serviceProvider, worker, id, priority, onExpired, + requestExpirationIvalSec, request)); + ptr->init(); + return ptr; } WorkerFindRequest::WorkerFindRequest(ServiceProvider::Ptr const& serviceProvider, string const& worker, @@ -73,10 +71,8 @@ void WorkerFindRequest::setInfo(ProtocolResponseFind& response) const { LOGS(_log, LOG_LVL_DEBUG, context(__func__)); replica::Lock lock(_mtx, context(__func__)); - response.set_allocated_target_performance(performance().info().release()); response.set_allocated_replica_info(_replicaInfo.info().release()); - *(response.mutable_request()) = _request; } @@ -84,50 +80,7 @@ bool WorkerFindRequest::execute() { LOGS(_log, LOG_LVL_DEBUG, context(__func__) << " database: " << database() << " chunk: " << chunk()); replica::Lock lock(_mtx, context(__func__)); - - // Set up the result if the operation is over - - bool completed = WorkerRequest::execute(); - if (completed) { - _replicaInfo = ReplicaInfo(ReplicaInfo::COMPLETE, worker(), database(), chunk(), - util::TimeUtils::now(), ReplicaInfo::FileInfoCollection()); - } - return completed; -} - -///////////////////////////////////////////////////////////////// -///////////////////// WorkerFindRequestPOSIX //////////////////// -///////////////////////////////////////////////////////////////// - -WorkerFindRequestPOSIX::Ptr WorkerFindRequestPOSIX::create(ServiceProvider::Ptr const& serviceProvider, - string const& worker, string const& id, - int priority, - ExpirationCallbackType const& onExpired, - unsigned int requestExpirationIvalSec, - ProtocolRequestFind const& request) { - return WorkerFindRequestPOSIX::Ptr(new WorkerFindRequestPOSIX( - serviceProvider, worker, id, priority, onExpired, requestExpirationIvalSec, request)); -} - -WorkerFindRequestPOSIX::WorkerFindRequestPOSIX(ServiceProvider::Ptr const& serviceProvider, - string const& worker, string const& id, int priority, - ExpirationCallbackType const& onExpired, - unsigned int requestExpirationIvalSec, - ProtocolRequestFind const& request) - : WorkerFindRequest(serviceProvider, worker, id, priority, onExpired, requestExpirationIvalSec, - request) {} - -bool WorkerFindRequestPOSIX::execute() { - LOGS(_log, LOG_LVL_DEBUG, context(__func__) << " database: " << database() << " chunk: " << chunk()); - - replica::Lock lock(_mtx, context(__func__)); - - // Abort the operation right away if that's the case - - if (_status == ProtocolStatus::IS_CANCELLING) { - setStatus(lock, ProtocolStatus::CANCELLED); - throw WorkerRequestCancelled(); - } + checkIfCancelling(lock, __func__); // There are two modes of operation of the code which would depend // on a presence (or a lack of that) to calculate control/check sums diff --git a/src/replica/worker/WorkerFindRequest.h b/src/replica/worker/WorkerFindRequest.h index d4bcba4aa..b523f265d 100644 --- a/src/replica/worker/WorkerFindRequest.h +++ b/src/replica/worker/WorkerFindRequest.h @@ -38,10 +38,7 @@ namespace lsst::qserv::replica { /** * Class WorkerFindRequest represents a context and a state of replica lookup - * requests within the worker servers. It can also be used for testing the framework - * operation as its implementation won't make any changes to any files or databases. - * - * Real implementations of the request processing must derive from this class. + * requests within the worker servers. */ class WorkerFindRequest : public WorkerRequest { public: @@ -77,12 +74,8 @@ class WorkerFindRequest : public WorkerRequest { ~WorkerFindRequest() override = default; - // Trivial get methods - std::string const& database() const { return _request.database(); } - unsigned int chunk() const { return _request.chunk(); } - bool computeCheckSum() const { return _request.compute_cs(); } /** @@ -99,51 +92,15 @@ class WorkerFindRequest : public WorkerRequest { unsigned int requestExpirationIvalSec, ProtocolRequestFind const& request); // Input parameters - ProtocolRequestFind const _request; /// Result of the operation ReplicaInfo _replicaInfo; -}; - -/** - * Class WorkerFindRequestPOSIX provides an actual implementation for - * the replica lookup requests based on the direct manipulation of files on - * a POSIX file system. - */ -class WorkerFindRequestPOSIX : public WorkerFindRequest { -public: - typedef std::shared_ptr Ptr; - - /// @see WorkerFindRequestPOSIX::create() - static Ptr create(ServiceProvider::Ptr const& serviceProvider, std::string const& worker, - std::string const& id, int priority, ExpirationCallbackType const& onExpired, - unsigned int requestExpirationIvalSec, ProtocolRequestFind const& request); - - WorkerFindRequestPOSIX() = delete; - WorkerFindRequestPOSIX(WorkerFindRequestPOSIX const&) = delete; - WorkerFindRequestPOSIX& operator=(WorkerFindRequestPOSIX const&) = delete; - - ~WorkerFindRequestPOSIX() final = default; - - bool execute() final; - -private: - WorkerFindRequestPOSIX(ServiceProvider::Ptr const& serviceProvider, std::string const& worker, - std::string const& id, int priority, ExpirationCallbackType const& onExpired, - unsigned int requestExpirationIvalSec, ProtocolRequestFind const& request); /// The engine for incremental control sum calculation std::unique_ptr _csComputeEnginePtr; }; -/** - * Class WorkerFindRequestFS has the same implementation as the 'typedef'-ed - * class for the replica deletion based on the direct manipulation of files on - * a POSIX file system. - */ -typedef WorkerFindRequestPOSIX WorkerFindRequestFS; - } // namespace lsst::qserv::replica #endif // LSST_QSERV_REPLICA_WORKERFINDREQUEST_H diff --git a/src/replica/worker/WorkerProcessor.cc b/src/replica/worker/WorkerProcessor.cc index 9b1d24343..7a111f3a7 100644 --- a/src/replica/worker/WorkerProcessor.cc +++ b/src/replica/worker/WorkerProcessor.cc @@ -29,13 +29,13 @@ // Qserv headers #include "replica/config/Configuration.h" +#include "replica/mysql/DatabaseMySQL.h" #include "replica/services/ServiceProvider.h" #include "replica/worker/WorkerDeleteRequest.h" #include "replica/worker/WorkerEchoRequest.h" #include "replica/worker/WorkerFindRequest.h" #include "replica/worker/WorkerFindAllRequest.h" #include "replica/worker/WorkerReplicationRequest.h" -#include "replica/worker/WorkerRequestFactory.h" #include "replica/worker/WorkerSqlRequest.h" #include "replica/worker/WorkerDirectorIndexRequest.h" #include "util/BlockPost.h" @@ -91,16 +91,16 @@ string WorkerProcessor::state2string(State state) { } WorkerProcessor::Ptr WorkerProcessor::create(ServiceProvider::Ptr const& serviceProvider, - WorkerRequestFactory const& requestFactory, string const& worker) { - return Ptr(new WorkerProcessor(serviceProvider, requestFactory, worker)); + return Ptr(new WorkerProcessor(serviceProvider, worker)); } -WorkerProcessor::WorkerProcessor(ServiceProvider::Ptr const& serviceProvider, - WorkerRequestFactory const& requestFactory, string const& worker) +WorkerProcessor::WorkerProcessor(ServiceProvider::Ptr const& serviceProvider, string const& worker) : _serviceProvider(serviceProvider), - _requestFactory(requestFactory), _worker(worker), + _connectionPool(database::mysql::ConnectionPool::create( + Configuration::qservWorkerDbParams(), + serviceProvider->config()->get("database", "services-pool-size"))), _state(STATE_IS_STOPPED), _startTime(util::TimeUtils::now()) {} @@ -156,10 +156,8 @@ void WorkerProcessor::drain() { // Collect identifiers of requests to be affected by the operation list ids; - for (auto&& ptr : _newRequests) ids.push_back(ptr->id()); for (auto&& entry : _inProgressRequests) ids.push_back(entry.first); - for (auto&& id : ids) _dequeueOrCancelImpl(lock, id); } @@ -196,20 +194,17 @@ void WorkerProcessor::enqueueForReplication(string const& id, int32_t priority, // won't pass further validation against the present configuration of the request // processing service. try { - auto const ptr = _requestFactory.createReplicationRequest( - _worker, id, priority, bind(&WorkerProcessor::dispose, shared_from_this(), _1), + auto const ptr = WorkerReplicationRequest::create( + _serviceProvider, _worker, id, priority, + [self = shared_from_this()](string const& requestId) { self->dispose(requestId); }, requestExpirationIvalSec, request); _newRequests.push(ptr); - response.set_status(ProtocolStatus::QUEUED); response.set_status_ext(ProtocolStatusExt::NONE); response.set_allocated_performance(ptr->performance().info().release()); - _setInfo(ptr, response); - } catch (invalid_argument const& ec) { LOGS(_log, LOG_LVL_ERROR, _context(__func__) << " " << ec.what()); - setDefaultResponse(response, ProtocolStatus::BAD, ProtocolStatusExt::INVALID_PARAM); } } @@ -239,20 +234,17 @@ void WorkerProcessor::enqueueForDeletion(string const& id, int32_t priority, // won't pass further validation against the present configuration of the request // processing service. try { - auto const ptr = _requestFactory.createDeleteRequest( - _worker, id, priority, bind(&WorkerProcessor::dispose, shared_from_this(), _1), + auto const ptr = WorkerDeleteRequest::create( + _serviceProvider, _worker, id, priority, + [self = shared_from_this()](string const& requestId) { self->dispose(requestId); }, requestExpirationIvalSec, request); _newRequests.push(ptr); - response.set_status(ProtocolStatus::QUEUED); response.set_status_ext(ProtocolStatusExt::NONE); response.set_allocated_performance(ptr->performance().info().release()); - _setInfo(ptr, response); - } catch (invalid_argument const& ec) { LOGS(_log, LOG_LVL_ERROR, _context(__func__) << " " << ec.what()); - setDefaultResponse(response, ProtocolStatus::BAD, ProtocolStatusExt::INVALID_PARAM); } } @@ -271,20 +263,17 @@ void WorkerProcessor::enqueueForFind(string const& id, int32_t priority, // won't pass further validation against the present configuration of the request // processing service. try { - auto const ptr = _requestFactory.createFindRequest( - _worker, id, priority, bind(&WorkerProcessor::dispose, shared_from_this(), _1), + auto const ptr = WorkerFindRequest::create( + _serviceProvider, _worker, id, priority, + [self = shared_from_this()](string const& requestId) { self->dispose(requestId); }, requestExpirationIvalSec, request); _newRequests.push(ptr); - response.set_status(ProtocolStatus::QUEUED); response.set_status_ext(ProtocolStatusExt::NONE); response.set_allocated_performance(ptr->performance().info().release()); - _setInfo(ptr, response); - } catch (invalid_argument const& ec) { LOGS(_log, LOG_LVL_ERROR, _context(__func__) << " " << ec.what()); - setDefaultResponse(response, ProtocolStatus::BAD, ProtocolStatusExt::INVALID_PARAM); } } @@ -301,20 +290,17 @@ void WorkerProcessor::enqueueForFindAll(string const& id, int32_t priority, // won't pass further validation against the present configuration of the request // processing service. try { - auto const ptr = _requestFactory.createFindAllRequest( - _worker, id, priority, bind(&WorkerProcessor::dispose, shared_from_this(), _1), + auto const ptr = WorkerFindAllRequest::create( + _serviceProvider, _worker, id, priority, + [self = shared_from_this()](string const& requestId) { self->dispose(requestId); }, requestExpirationIvalSec, request); _newRequests.push(ptr); - response.set_status(ProtocolStatus::QUEUED); response.set_status_ext(ProtocolStatusExt::NONE); response.set_allocated_performance(ptr->performance().info().release()); - _setInfo(ptr, response); - } catch (invalid_argument const& ec) { LOGS(_log, LOG_LVL_ERROR, _context(__func__) << " " << ec.what()); - setDefaultResponse(response, ProtocolStatus::BAD, ProtocolStatusExt::INVALID_PARAM); } } @@ -333,12 +319,10 @@ void WorkerProcessor::enqueueForEcho(string const& id, int32_t priority, WorkerPerformance performance; performance.setUpdateStart(); performance.setUpdateFinish(); - response.set_status(ProtocolStatus::SUCCESS); response.set_status_ext(ProtocolStatusExt::NONE); response.set_allocated_performance(performance.info().release()); response.set_data(request.data()); - return; } @@ -346,20 +330,18 @@ void WorkerProcessor::enqueueForEcho(string const& id, int32_t priority, // won't pass further validation against the present configuration of the request // processing service. try { - auto const ptr = _requestFactory.createEchoRequest( - _worker, id, priority, bind(&WorkerProcessor::dispose, shared_from_this(), _1), + auto const ptr = WorkerEchoRequest::create( + _serviceProvider, _worker, id, priority, + [self = shared_from_this()](string const& requestId) { self->dispose(requestId); }, requestExpirationIvalSec, request); _newRequests.push(ptr); - response.set_status(ProtocolStatus::QUEUED); response.set_status_ext(ProtocolStatusExt::NONE); response.set_allocated_performance(ptr->performance().info().release()); - _setInfo(ptr, response); } catch (invalid_argument const& ec) { LOGS(_log, LOG_LVL_ERROR, _context(__func__) << " " << ec.what()); - setDefaultResponse(response, ProtocolStatus::BAD, ProtocolStatusExt::INVALID_PARAM); } } @@ -377,20 +359,17 @@ void WorkerProcessor::enqueueForSql(std::string const& id, int32_t priority, // won't pass further validation against the present configuration of the request // processing service. try { - auto const ptr = _requestFactory.createSqlRequest( - _worker, id, priority, bind(&WorkerProcessor::dispose, shared_from_this(), _1), + auto const ptr = WorkerSqlRequest::create( + _serviceProvider, _worker, id, priority, + [self = shared_from_this()](string const& requestId) { self->dispose(requestId); }, requestExpirationIvalSec, request); _newRequests.push(ptr); - response.set_status(ProtocolStatus::QUEUED); response.set_status_ext(ProtocolStatusExt::NONE); response.set_allocated_performance(ptr->performance().info().release()); - _setInfo(ptr, response); - } catch (invalid_argument const& ec) { LOGS(_log, LOG_LVL_ERROR, _context(__func__) << " " << ec.what()); - setDefaultResponse(response, ProtocolStatus::BAD, ProtocolStatusExt::INVALID_PARAM); } } @@ -411,20 +390,17 @@ void WorkerProcessor::enqueueForDirectorIndex(string const& id, int32_t priority // won't pass further validation against the present configuration of the request // processing service. try { - auto const ptr = _requestFactory.createDirectorIndexRequest( - _worker, id, priority, bind(&WorkerProcessor::dispose, shared_from_this(), _1), + auto const ptr = WorkerDirectorIndexRequest::create( + _serviceProvider, _connectionPool, _worker, id, priority, + [self = shared_from_this()](string const& requestId) { self->dispose(requestId); }, requestExpirationIvalSec, request); _newRequests.push(ptr); - response.set_status(ProtocolStatus::QUEUED); response.set_status_ext(ProtocolStatusExt::NONE); response.set_allocated_performance(ptr->performance().info().release()); - _setInfo(ptr, response); - } catch (invalid_argument const& ec) { LOGS(_log, LOG_LVL_ERROR, _context(__func__) << " " << ec.what()); - setDefaultResponse(response, ProtocolStatus::BAD, ProtocolStatusExt::INVALID_PARAM); } } @@ -487,7 +463,6 @@ WorkerRequest::Ptr WorkerProcessor::_dequeueOrCancelImpl(replica::Lock const& lo case ProtocolStatus::SUCCESS: case ProtocolStatus::FAILED: return ptr; - default: throw logic_error(_classMethodContext(__func__) + " unexpected request status " + WorkerRequest::status2string(ptr->status()) + " in in-progress requests"); @@ -586,7 +561,7 @@ void WorkerProcessor::setServiceResponse(ProtocolServiceResponse& response, stri replica::Lock lock(_mtx, _context(__func__)); response.set_status(status); - response.set_technology(_requestFactory.technology()); + response.set_technology("FS"); response.set_start_time(_startTime); switch (state()) { diff --git a/src/replica/worker/WorkerProcessor.h b/src/replica/worker/WorkerProcessor.h index 6e0b914f2..e6dc75e22 100644 --- a/src/replica/worker/WorkerProcessor.h +++ b/src/replica/worker/WorkerProcessor.h @@ -39,9 +39,9 @@ #include "replica/worker/WorkerRequest.h" // Forward declarations -namespace lsst::qserv::replica { -class WorkerRequestFactory; -} // namespace lsst::qserv::replica +namespace lsst::qserv::replica::database::mysql { +class ConnectionPool; +} // namespace lsst::qserv::replica::database::mysql // This header declarations namespace lsst::qserv::replica { @@ -107,12 +107,9 @@ class WorkerProcessor : public std::enable_shared_from_this { * @param serviceProvider provider is needed to access the Configuration of * a setup in order to get a number of the processing threads to be launched * by the processor. - * @param requestFactory reference to a factory of requests (for instantiating - * request objects) * @param worker the name of a worker */ - static Ptr create(ServiceProvider::Ptr const& serviceProvider, WorkerRequestFactory const& requestFactory, - std::string const& worker); + static Ptr create(ServiceProvider::Ptr const& serviceProvider, std::string const& worker); WorkerProcessor() = delete; WorkerProcessor(WorkerProcessor const&) = delete; @@ -336,8 +333,7 @@ class WorkerProcessor : public std::enable_shared_from_this { size_t numFinishedRequests() const; private: - WorkerProcessor(ServiceProvider::Ptr const& serviceProvider, WorkerRequestFactory const& requestFactory, - std::string const& worker); + WorkerProcessor(ServiceProvider::Ptr const& serviceProvider, std::string const& worker); static std::string _classMethodContext(std::string const& func); @@ -506,8 +502,8 @@ class WorkerProcessor : public std::enable_shared_from_this { std::string _context(std::string const& func = std::string()) const { return "PROCESSOR " + func; } ServiceProvider::Ptr const _serviceProvider; - WorkerRequestFactory const& _requestFactory; std::string const _worker; + std::shared_ptr const _connectionPool; State _state; diff --git a/src/replica/worker/WorkerReplicationRequest.cc b/src/replica/worker/WorkerReplicationRequest.cc index 02ce1d2b5..0c3198532 100644 --- a/src/replica/worker/WorkerReplicationRequest.cc +++ b/src/replica/worker/WorkerReplicationRequest.cc @@ -48,18 +48,16 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.WorkerReplicationRequest"); namespace lsst::qserv::replica { -/////////////////////////////////////////////////////////////////// -///////////////////// WorkerReplicationRequest //////////////////// -/////////////////////////////////////////////////////////////////// - WorkerReplicationRequest::Ptr WorkerReplicationRequest::create(ServiceProvider::Ptr const& serviceProvider, string const& worker, string const& id, int priority, ExpirationCallbackType const& onExpired, unsigned int requestExpirationIvalSec, ProtocolRequestReplicate const& request) { - return WorkerReplicationRequest::Ptr(new WorkerReplicationRequest( + auto ptr = WorkerReplicationRequest::Ptr(new WorkerReplicationRequest( serviceProvider, worker, id, priority, onExpired, requestExpirationIvalSec, request)); + ptr->init(); + return ptr; } WorkerReplicationRequest::WorkerReplicationRequest(ServiceProvider::Ptr const& serviceProvider, @@ -70,7 +68,13 @@ WorkerReplicationRequest::WorkerReplicationRequest(ServiceProvider::Ptr const& s : WorkerRequest(serviceProvider, worker, "REPLICATE", id, priority, onExpired, requestExpirationIvalSec), _request(request), - _sourceWorkerHostPort(request.worker_host() + ":" + to_string(request.worker_port())) { + _sourceWorkerHostPort(request.worker_host() + ":" + to_string(request.worker_port())), + _databaseInfo(_serviceProvider->config()->databaseInfo(request.database())), + _initialized(false), + _files(FileUtils::partitionedFiles(_databaseInfo, request.chunk())), + _tmpFilePtr(nullptr), + _buf(0), + _bufSize(serviceProvider->config()->get("worker", "fs-buf-size-bytes")) { string const context = "WorkerReplicationRequest::" + string(__func__) + " "; if (worker == request.worker()) { @@ -89,294 +93,29 @@ WorkerReplicationRequest::WorkerReplicationRequest(ServiceProvider::Ptr const& s } } +WorkerReplicationRequest::~WorkerReplicationRequest() { + replica::Lock lock(_mtx, context(__func__)); + _releaseResources(lock); +} + void WorkerReplicationRequest::setInfo(ProtocolResponseReplicate& response) const { LOGS(_log, LOG_LVL_DEBUG, context(__func__)); replica::Lock lock(_mtx, context(__func__)); response.set_allocated_target_performance(performance().info().release()); - response.set_allocated_replica_info(replicaInfo.info().release()); + response.set_allocated_replica_info(_replicaInfo.info().release()); *(response.mutable_request()) = _request; } bool WorkerReplicationRequest::execute() { - LOGS(_log, LOG_LVL_DEBUG, - context(__func__) << " sourceWorkerHostPort: " << sourceWorkerHostPort() << " db: " << database() - << " chunk: " << chunk()); - - bool const complete = WorkerRequest::execute(); - if (complete) { - replicaInfo = ReplicaInfo(ReplicaInfo::COMPLETE, worker(), database(), chunk(), - util::TimeUtils::now(), ReplicaInfo::FileInfoCollection()); - } - return complete; -} - -//////////////////////////////////////////////////////////////////////// -///////////////////// WorkerReplicationRequestPOSIX //////////////////// -//////////////////////////////////////////////////////////////////////// - -WorkerReplicationRequestPOSIX::Ptr WorkerReplicationRequestPOSIX::create( - ServiceProvider::Ptr const& serviceProvider, string const& worker, string const& id, int priority, - ExpirationCallbackType const& onExpired, unsigned int requestExpirationIvalSec, - ProtocolRequestReplicate const& request) { - return WorkerReplicationRequestPOSIX::Ptr(new WorkerReplicationRequestPOSIX( - serviceProvider, worker, id, priority, onExpired, requestExpirationIvalSec, request)); -} - -WorkerReplicationRequestPOSIX::WorkerReplicationRequestPOSIX(ServiceProvider::Ptr const& serviceProvider, - string const& worker, string const& id, - int priority, - ExpirationCallbackType const& onExpired, - unsigned int requestExpirationIvalSec, - ProtocolRequestReplicate const& request) - : WorkerReplicationRequest(serviceProvider, worker, id, priority, onExpired, requestExpirationIvalSec, - request) {} - -bool WorkerReplicationRequestPOSIX::execute() { - LOGS(_log, LOG_LVL_DEBUG, - context(__func__) << " sourceWorkerHostPort: " << sourceWorkerHostPort() - << " sourceWorkerDataDir: " << sourceWorkerDataDir() - << " database: " << database() << " chunk: " << chunk()); - - replica::Lock lock(_mtx, context(__func__)); - - // Obtain the list of files to be migrated - // - // IMPLEMENTATION NOTES: - // - // - Note using the overloaded operator '/' which is used to form - // folders and files path names below. The operator will concatenate - // names and also insert a file separator for an operating system - // on which this code will get compiled. - // - // - Temporary file names at a destination folders are prepended with - // prefix '_' to prevent colliding with the canonical names. They will - // be renamed in the last step. - // - // - All operations with the file system namespace (creating new non-temporary - // files, checking for folders and files, renaming files, creating folders, etc.) - // are guarded by acquiring replica::Lock lock(_mtxDataFolderOperations) where it's needed. - - auto const config = serviceProvider()->config(); - DatabaseInfo const databaseInfo = config->databaseInfo(database()); - - fs::path const inDir = fs::path(sourceWorkerDataDir()) / database(); - fs::path const outDir = fs::path(config->get("worker", "data-dir")) / database(); - - vector const files = FileUtils::partitionedFiles(databaseInfo, chunk()); - - vector inFiles; - vector tmpFiles; - vector outFiles; - - map file2inFile; - map file2tmpFile; - map file2outFile; - - map inFile2mtime; - - for (auto&& file : files) { - fs::path const inFile = inDir / file; - inFiles.push_back(inFile); - file2inFile[file] = inFile; - - fs::path const tmpFile = outDir / ("_" + file); - tmpFiles.push_back(tmpFile); - file2tmpFile[file] = tmpFile; - - fs::path const outFile = outDir / file; - outFiles.push_back(outFile); - file2outFile[file] = outFile; - } - - // Check input files, check and sanitize the destination folder - - uintmax_t totalBytes = 0; // the total number of bytes in all input files to be moved - - WorkerRequest::ErrorContext errorContext; - boost::system::error_code ec; - { - replica::Lock dataFolderLock(_mtxDataFolderOperations, context(__func__) + ":1"); - - // Check for a presence of input files and calculate space requirement - - for (auto&& file : inFiles) { - fs::file_status const stat = fs::status(file, ec); - errorContext = errorContext or - reportErrorIf(stat.type() == fs::status_error, ProtocolStatusExt::FILE_STAT, - "failed to check the status of input file: " + file.string()) or - reportErrorIf(not fs::exists(stat), ProtocolStatusExt::NO_FILE, - "the input file does not exist: " + file.string()); - - totalBytes += fs::file_size(file, ec); - errorContext = - errorContext or reportErrorIf(ec.value() != 0, ProtocolStatusExt::FILE_SIZE, - "failed to get the size of input file: " + file.string()); - - inFile2mtime[file] = fs::last_write_time(file, ec); - errorContext = - errorContext or reportErrorIf(ec.value() != 0, ProtocolStatusExt::FILE_MTIME, - "failed to get the mtime of input file: " + file.string()); - } - - // Check and sanitize the output directory - - bool const outDirExists = fs::exists(outDir, ec); - errorContext = errorContext or - reportErrorIf(ec.value() != 0, ProtocolStatusExt::FOLDER_STAT, - "failed to check the status of output directory: " + outDir.string()) or - reportErrorIf(not outDirExists, ProtocolStatusExt::NO_FOLDER, - "the output directory doesn't exist: " + outDir.string()); - - // The files with canonical(!) names should NOT exist at the destination - // folder. - - for (auto&& file : outFiles) { - fs::file_status const stat = fs::status(file, ec); - errorContext = errorContext or - reportErrorIf(stat.type() == fs::status_error, ProtocolStatusExt::FILE_STAT, - "failed to check the status of output file: " + file.string()) or - reportErrorIf(fs::exists(stat), ProtocolStatusExt::FILE_EXISTS, - "the output file already exists: " + file.string()); - } - - // Check if there are any files with the temporary names at the destination - // folder and if so then get rid of them. - - for (auto&& file : tmpFiles) { - fs::file_status const stat = fs::status(file, ec); - errorContext = errorContext or - reportErrorIf(stat.type() == fs::status_error, ProtocolStatusExt::FILE_STAT, - "failed to check the status of temporary file: " + file.string()); - - if (fs::exists(stat)) { - fs::remove(file, ec); - errorContext = - errorContext or reportErrorIf(ec.value() != 0, ProtocolStatusExt::FILE_DELETE, - "failed to remove temporary file: " + file.string()); - } - } - - // Make sure a file system at the destination has enough space - // to accommodate new files - // - // NOTE: this operation runs after cleaning up temporary files - - fs::space_info const space = fs::space(outDir, ec); - errorContext = - errorContext or - reportErrorIf(ec.value() != 0, ProtocolStatusExt::SPACE_REQ, - "failed to obtain space information at output folder: " + outDir.string()) or - reportErrorIf(space.available < totalBytes, ProtocolStatusExt::NO_SPACE, - "not enough free space available at output folder: " + outDir.string()); - } - if (errorContext.failed) { - setStatus(lock, ProtocolStatus::FAILED, errorContext.extendedStatus); - return true; - } - - // Begin copying files into the destination folder under their - // temporary names w/o acquiring the directory lock. - - for (auto&& file : files) { - fs::path const inFile = file2inFile[file]; - fs::path const tmpFile = file2tmpFile[file]; - - fs::copy_file(inFile, tmpFile, ec); - errorContext = errorContext or reportErrorIf(ec.value() != 0, ProtocolStatusExt::FILE_COPY, - "failed to copy file: " + inFile.string() + - " into: " + tmpFile.string()); - } - if (errorContext.failed) { - setStatus(lock, ProtocolStatus::FAILED, errorContext.extendedStatus); - return true; - } - - // Rename temporary files into the canonical ones - // Note that this operation changes the directory namespace in a way - // which may affect other users (like replica lookup operations, etc.). Hence we're - // acquiring the directory lock to guarantee a consistent view onto the folder. - - { - replica::Lock dataFolderLock(_mtxDataFolderOperations, context(__func__) + ":2"); - - // ATTENTION: as per ISO/IEC 9945 the file rename operation will - // remove empty files. Not sure if this should be treated - // in a special way? - - for (auto&& file : files) { - fs::path const inFile = file2inFile[file]; - fs::path const tmpFile = file2tmpFile[file]; - fs::path const outFile = file2outFile[file]; - - fs::rename(tmpFile, outFile, ec); - errorContext = errorContext or reportErrorIf(ec.value() != 0, ProtocolStatusExt::FILE_RENAME, - "failed to rename file: " + tmpFile.string()); - - fs::last_write_time(outFile, inFile2mtime[inFile], ec); - errorContext = errorContext or - reportErrorIf(ec.value() != 0, ProtocolStatusExt::FILE_MTIME, - "failed to set the mtime of output file: " + outFile.string()); - } - } - if (errorContext.failed) { - setStatus(lock, ProtocolStatus::FAILED, errorContext.extendedStatus); - return true; - } - - // For now (before finalizing the progress reporting protocol) just return - // the percentage of the total amount of data moved - - setStatus(lock, ProtocolStatus::SUCCESS); - return true; -} - -///////////////////////////////////////////////////////////////////// -///////////////////// WorkerReplicationRequestFS //////////////////// -///////////////////////////////////////////////////////////////////// - -WorkerReplicationRequestFS::Ptr WorkerReplicationRequestFS::create( - ServiceProvider::Ptr const& serviceProvider, string const& worker, string const& id, int priority, - ExpirationCallbackType const& onExpired, unsigned int requestExpirationIvalSec, - ProtocolRequestReplicate const& request) { - return WorkerReplicationRequestFS::Ptr(new WorkerReplicationRequestFS( - serviceProvider, worker, id, priority, onExpired, requestExpirationIvalSec, request)); -} - -WorkerReplicationRequestFS::WorkerReplicationRequestFS(ServiceProvider::Ptr const& serviceProvider, - string const& worker, string const& id, int priority, - ExpirationCallbackType const& onExpired, - unsigned int requestExpirationIvalSec, - ProtocolRequestReplicate const& request) - : WorkerReplicationRequest(serviceProvider, worker, id, priority, onExpired, requestExpirationIvalSec, - request), - _databaseInfo(_serviceProvider->config()->databaseInfo(request.database())), - _initialized(false), - _files(FileUtils::partitionedFiles(_databaseInfo, request.chunk())), - _tmpFilePtr(nullptr), - _buf(0), - _bufSize(serviceProvider->config()->get("worker", "fs-buf-size-bytes")) {} - -WorkerReplicationRequestFS::~WorkerReplicationRequestFS() { - replica::Lock lock(_mtx, context(__func__)); - _releaseResources(lock); -} - -bool WorkerReplicationRequestFS::execute() { LOGS(_log, LOG_LVL_DEBUG, context(__func__) << " sourceWorkerHostPort: " << sourceWorkerHostPort() << " database: " << database() << " chunk: " << chunk()); replica::Lock lock(_mtx, context(__func__)); - - // Abort the operation right away if that's the case - - if (_status == ProtocolStatus::IS_CANCELLING) { - setStatus(lock, ProtocolStatus::CANCELLED); - throw WorkerRequestCancelled(); - } + checkIfCancelling(lock, __func__); // Obtain the list of files to be migrated // @@ -548,7 +287,7 @@ bool WorkerReplicationRequestFS::execute() { // Allocate the record buffer _buf = new uint8_t[_bufSize]; if (not _buf) { - throw runtime_error("WorkerReplicationRequestFS::" + string(__func__) + + throw runtime_error("WorkerReplicationRequest::" + string(__func__) + " buffer allocation failed"); } @@ -642,7 +381,7 @@ bool WorkerReplicationRequestFS::execute() { return _finalize(lock); } -bool WorkerReplicationRequestFS::_openFiles(replica::Lock const& lock) { +bool WorkerReplicationRequest::_openFiles(replica::Lock const& lock) { LOGS(_log, LOG_LVL_DEBUG, context(__func__) << " sourceWorkerHostPort: " << sourceWorkerHostPort() << " database: " << database() << " chunk: " << chunk() << " file: " << *_fileItr); @@ -683,7 +422,7 @@ bool WorkerReplicationRequestFS::_openFiles(replica::Lock const& lock) { return true; } -bool WorkerReplicationRequestFS::_finalize(replica::Lock const& lock) { +bool WorkerReplicationRequest::_finalize(replica::Lock const& lock) { LOGS(_log, LOG_LVL_DEBUG, context(__func__) << " sourceWorkerHostPort: " << sourceWorkerHostPort() << " database: " << database() << " chunk: " << chunk()); @@ -726,7 +465,7 @@ bool WorkerReplicationRequestFS::_finalize(replica::Lock const& lock) { return true; } -void WorkerReplicationRequestFS::_updateInfo(replica::Lock const& lock) { +void WorkerReplicationRequest::_updateInfo(replica::Lock const& lock) { size_t totalInSizeBytes = 0; size_t totalOutSizeBytes = 0; @@ -746,11 +485,11 @@ void WorkerReplicationRequestFS::_updateInfo(replica::Lock const& lock) { // Fill in the info on the chunk before finishing the operation - WorkerReplicationRequest::replicaInfo = + WorkerReplicationRequest::_replicaInfo = ReplicaInfo(status, worker(), database(), chunk(), util::TimeUtils::now(), fileInfoCollection); } -void WorkerReplicationRequestFS::_releaseResources(replica::Lock const& lock) { +void WorkerReplicationRequest::_releaseResources(replica::Lock const& lock) { // Drop a connection to the remote server _inFilePtr.reset(); diff --git a/src/replica/worker/WorkerReplicationRequest.h b/src/replica/worker/WorkerReplicationRequest.h index ab9d98219..5c6c74cc1 100644 --- a/src/replica/worker/WorkerReplicationRequest.h +++ b/src/replica/worker/WorkerReplicationRequest.h @@ -46,10 +46,7 @@ namespace lsst::qserv::replica { /** * Class WorkerReplicationRequest represents a context and a state of replication - * requests within the worker servers. It can also be used for testing the framework - * operation as its implementation won't make any changes to any files or databases. - * - * Real implementations of the request processing must derive from this class. + * requests within the worker servers. */ class WorkerReplicationRequest : public WorkerRequest { public: @@ -84,9 +81,8 @@ class WorkerReplicationRequest : public WorkerRequest { WorkerReplicationRequest(WorkerReplicationRequest const&) = delete; WorkerReplicationRequest& operator=(WorkerReplicationRequest const&) = delete; - ~WorkerReplicationRequest() override = default; - - // Trivial get methods + /// Non-trivial destructor is needed to relese resources + ~WorkerReplicationRequest() override; std::string const& database() const { return _request.database(); } unsigned int chunk() const { return _request.chunk(); } @@ -104,84 +100,11 @@ class WorkerReplicationRequest : public WorkerRequest { bool execute() override; -protected: +private: WorkerReplicationRequest(ServiceProvider::Ptr const& serviceProvider, std::string const& worker, std::string const& id, int priority, ExpirationCallbackType const& onExpired, unsigned int requestExpirationIvalSec, ProtocolRequestReplicate const& request); - /// Result of the operation - ReplicaInfo replicaInfo; - -private: - // Input parameters - ProtocolRequestReplicate const _request; - - /// The cached connection parameters for the source worker (for error reporting and debugging). - std::string const _sourceWorkerHostPort; -}; - -/** - * Class WorkerReplicationRequestPOSIX provides an actual implementation for - * the replication requests based on the direct manipulation of files on - * a POSIX file system. - */ -class WorkerReplicationRequestPOSIX : public WorkerReplicationRequest { -public: - typedef std::shared_ptr Ptr; - - /// @see WorkerReplicationRequest::created() - static Ptr create(ServiceProvider::Ptr const& serviceProvider, std::string const& worker, - std::string const& id, int priority, ExpirationCallbackType const& onExpired, - unsigned int requestExpirationIvalSec, ProtocolRequestReplicate const& request); - - WorkerReplicationRequestPOSIX() = delete; - WorkerReplicationRequestPOSIX(WorkerReplicationRequestPOSIX const&) = delete; - WorkerReplicationRequestPOSIX& operator=(WorkerReplicationRequestPOSIX const&) = delete; - - ~WorkerReplicationRequestPOSIX() final = default; - - bool execute() final; - -protected: - WorkerReplicationRequestPOSIX(ServiceProvider::Ptr const& serviceProvider, std::string const& worker, - std::string const& id, int priority, - ExpirationCallbackType const& onExpired, - unsigned int requestExpirationIvalSec, - ProtocolRequestReplicate const& request); -}; - -/** - * Class WorkerReplicationRequestFS provides an actual implementation for - * the replication requests based on the direct manipulation of local files - * on a POSIX file system and for reading remote files using the built-into-worker - * simple file server. - */ -class WorkerReplicationRequestFS : public WorkerReplicationRequest { -public: - typedef std::shared_ptr Ptr; - - /// @see WorkerReplicationRequest::created() - static Ptr create(ServiceProvider::Ptr const& serviceProvider, std::string const& worker, - std::string const& id, int priority, ExpirationCallbackType const& onExpired, - unsigned int requestExpirationIvalSec, ProtocolRequestReplicate const& request); - - WorkerReplicationRequestFS() = delete; - WorkerReplicationRequestFS(WorkerReplicationRequestFS const&) = delete; - WorkerReplicationRequestFS& operator=(WorkerReplicationRequestFS const&) = delete; - - /// Destructor (non trivial one is needed to release resources) - ~WorkerReplicationRequestFS() final; - - bool execute() final; - -protected: - /// @see WorkerReplicationRequestFS::create() - WorkerReplicationRequestFS(ServiceProvider::Ptr const& serviceProvider, std::string const& worker, - std::string const& id, int priority, ExpirationCallbackType const& onExpired, - unsigned int requestExpirationIvalSec, - ProtocolRequestReplicate const& request); - -private: /** * Open files associated with the current state of iterator _fileItr. * @@ -209,18 +132,25 @@ class WorkerReplicationRequestFS : public WorkerReplicationRequest { * released to prevent unnecessary resource utilization. Note that * request objects can stay in the server's memory for an extended * period of time. - * * @param lock A lock to be acquired before calling this method */ void _releaseResources(replica::Lock const& lock); /** * Update file migration statistics - * * @param lock A lock to be acquired before calling this method */ void _updateInfo(replica::Lock const& lock); + // Input parameters + ProtocolRequestReplicate const _request; + + /// Result of the operation + ReplicaInfo _replicaInfo; + + /// The cached connection parameters for the source worker (for error reporting and debugging). + std::string const _sourceWorkerHostPort; + /// Cached descriptor of the database obtained from the Configuration DatabaseInfo const _databaseInfo; diff --git a/src/replica/worker/WorkerRequest.cc b/src/replica/worker/WorkerRequest.cc index 245c27573..ce7bad3e8 100644 --- a/src/replica/worker/WorkerRequest.cc +++ b/src/replica/worker/WorkerRequest.cc @@ -95,6 +95,19 @@ WorkerRequest::~WorkerRequest() { dispose(); } +void WorkerRequest::checkIfCancelling(replica::Lock const& lock, string const& func) { + switch (status()) { + case ProtocolStatus::IN_PROGRESS: + break; + case ProtocolStatus::IS_CANCELLING: + setStatus(lock, ProtocolStatus::CANCELLED); + throw WorkerRequestCancelled(); + default: + throw logic_error(context(func) + + " not allowed while in status: " + WorkerRequest::status2string(status())); + } +} + WorkerRequest::ErrorContext WorkerRequest::reportErrorIf(bool errorCondition, ProtocolStatusExt extendedStatus, string const& errorMsg) { @@ -143,26 +156,13 @@ void WorkerRequest::start() { bool WorkerRequest::execute() { LOGS(_log, LOG_LVL_TRACE, context(__func__)); replica::Lock lock(_mtx, context(__func__)); + checkIfCancelling(lock, __func__); // Simulate request 'processing' for some maximum duration of time (milliseconds) // while making a progress through increments of random duration of time. // Success/failure modes will be also simulated using the corresponding generator. - - switch (status()) { - case ProtocolStatus::IN_PROGRESS: - break; - case ProtocolStatus::IS_CANCELLING: - setStatus(lock, ProtocolStatus::CANCELLED); - throw WorkerRequestCancelled(); - default: - throw logic_error(context(__func__) + - " not allowed while in status: " + WorkerRequest::status2string(status())); - } - _durationMillisec += ::incrementIvalMillisec.wait(); - if (_durationMillisec < ::maxDurationMillisec) return false; - setStatus(lock, ::successRateGenerator.success() ? ProtocolStatus::SUCCESS : ProtocolStatus::FAILED); return true; } diff --git a/src/replica/worker/WorkerRequest.h b/src/replica/worker/WorkerRequest.h index 362ba3474..7c7d3ee34 100644 --- a/src/replica/worker/WorkerRequest.h +++ b/src/replica/worker/WorkerRequest.h @@ -120,17 +120,11 @@ class WorkerRequest : public std::enable_shared_from_this { * of the request. * * This method is required to be called while the request state is ProtocolStatus::IN_PROGRESS. - * - * The method will throw custom exception WorkerRequestCancelled when - * it detects a cancellation request. - * - * The default implementation of the method will do nothing, just simulate - * processing. This can be serve as a foundation for various tests - * of this framework. + * The method will throw custom exception WorkerRequestCancelled when it detects a cancellation request. * * @return result of the operation as explained above */ - virtual bool execute(); + virtual bool execute() = 0; /** * Cancel execution of the request. @@ -139,12 +133,9 @@ class WorkerRequest : public std::enable_shared_from_this { * the request. The default (the base class's implementation) assumes * the following transitions: * - * ProtocolStatus::CREATED or ProtocolStatus::CANCELLED - transition to state - ProtocolStatus::CANCELLED - * ProtocolStatus::IN_PROGRESS or ProtocolStatus::IS_CANCELLING - transition to state - ProtocolStatus::IS_CANCELLING - * other - throwing std::logic_error - + * {ProtocolStatus::CREATED,ProtocolStatus::CANCELLED} -> ProtocolStatus::CANCELLED + * {ProtocolStatus::IN_PROGRESS,ProtocolStatus::IS_CANCELLING} -> ProtocolStatus::IS_CANCELLING + * {*} -> throw std::logic_error */ virtual void cancel(); @@ -156,19 +147,16 @@ class WorkerRequest : public std::enable_shared_from_this { * the request. The default (the base class's implementation) assumes * the following transitions: * - * ProtocolStatus::CREATED or ProtocolStatus::IN_PROGRESS - transition to ProtocolStatus::CREATED - * ProtocolStatus::IS_CANCELLING - transition to ProtocolStatus::CANCELLED and - * throwing WorkerRequestCancelled other - throwing - * std::logic_error + * {ProtocolStatus::CREATED, ProtocolStatus::IN_PROGRESS} -> ProtocolStatus::CREATED + * {ProtocolStatus::IS_CANCELLING} -> ProtocolStatus::CANCELLED -> throw WorkerRequestCancelled + * {*} -> throw std::logic_error */ virtual void rollback(); /** * This method is called from *ANY* initial state in order to turn * the request back into the initial ProtocolStatus::CREATED. - * - * @param func (optional) the name of a function/method which requested - * the context string + * @param func (optional) the name of a function/method which requested the context string */ void stop(); @@ -214,6 +202,21 @@ class WorkerRequest : public std::enable_shared_from_this { ExpirationCallbackType const& onExpired = nullptr, unsigned int requestExpirationIvalSec = 0); + /** + * The method is used to check if the request is entered the cancellation state. + * The implementation assumes the following transitions: + * + * {ProtocolStatus::IN_PROGRESS} -> ProtocolStatus::IN_PROGRESS + * {ProtocolStatus::IS_CANCELLING} -> ProtocolStatus::CANCELLED -> throw WorkerRequestCancelled + * {*} -> throw std::logic_error + * + * @param lock a lock on _mtx which acquired before calling this method + * @param func the name of a function/method which called the method + * @throws WorkerRequestCancelled if the request is being cancelled. + * @throws std::logic_error if the state is not as expected. + */ + void checkIfCancelling(replica::Lock const& lock, std::string const& func); + /** Set the status * * @note this method needs to be called within a thread-safe context @@ -343,15 +346,9 @@ class WorkerRequest : public std::enable_shared_from_this { struct WorkerRequestCompare { /** * Sort requests by their priorities - * - * @param lhs - * pointer to a request on the left side of a logical comparison - * - * @param rhs - * pointer to a request on the right side of a logical comparison - * - * @return - * 'true' if the priority of 'lhs' is strictly less than the one of 'rhs' + * @param lhs pointer to a request on the left side of a logical comparison + * @param rhs pointer to a request on the right side of a logical comparison + * @return 'true' if the priority of 'lhs' is strictly less than the one of 'rhs' */ bool operator()(WorkerRequest::Ptr const& lhs, WorkerRequest::Ptr const& rhs) const { return lhs->priority() < rhs->priority(); diff --git a/src/replica/worker/WorkerRequestFactory.cc b/src/replica/worker/WorkerRequestFactory.cc deleted file mode 100644 index 251ddd98f..000000000 --- a/src/replica/worker/WorkerRequestFactory.cc +++ /dev/null @@ -1,388 +0,0 @@ -/* - * LSST Data Management System - * - * This product includes software developed by the - * LSST Project (http://www.lsst.org/). - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the LSST License Statement and - * the GNU General Public License along with this program. If not, - * see . - */ - -// Class header -#include "replica/worker/WorkerRequestFactory.h" - -// System headers -#include - -// Qserv headers -#include "replica/mysql/DatabaseMySQL.h" -#include "replica/config/Configuration.h" -#include "replica/services/ServiceProvider.h" -#include "replica/worker/WorkerDeleteRequest.h" -#include "replica/worker/WorkerEchoRequest.h" -#include "replica/worker/WorkerFindAllRequest.h" -#include "replica/worker/WorkerFindRequest.h" -#include "replica/worker/WorkerDirectorIndexRequest.h" -#include "replica/worker/WorkerReplicationRequest.h" -#include "replica/worker/WorkerSqlRequest.h" - -// LSST headers -#include "lsst/log/Log.h" - -using namespace std; - -namespace { - -LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.WorkerRequestFactory"); - -} // namespace - -namespace lsst::qserv::replica { - -/////////////////////////////////////////////////////////////////// -///////////////////// WorkerRequestFactoryBase //////////////////// -/////////////////////////////////////////////////////////////////// - -WorkerRequestFactoryBase::WorkerRequestFactoryBase(ServiceProvider::Ptr const& serviceProvider, - database::mysql::ConnectionPool::Ptr const& connectionPool) - : _serviceProvider(serviceProvider), _connectionPool(connectionPool) {} - -/////////////////////////////////////////////////////////////////// -///////////////////// WorkerRequestFactoryTest //////////////////// -/////////////////////////////////////////////////////////////////// - -/** - * Class WorkerRequestFactory is a factory class constructing the test versions - * of the request objects which make no persistent side effects. - */ -class WorkerRequestFactoryTest : public WorkerRequestFactoryBase { -public: - WorkerRequestFactoryTest() = delete; - WorkerRequestFactoryTest(WorkerRequestFactoryTest const&) = delete; - WorkerRequestFactoryTest& operator=(WorkerRequestFactoryTest const&) = delete; - - WorkerRequestFactoryTest(ServiceProvider::Ptr const& serviceProvider, - database::mysql::ConnectionPool::Ptr const& connectionPool) - : WorkerRequestFactoryBase(serviceProvider, connectionPool) {} - - ~WorkerRequestFactoryTest() final = default; - - string technology() const final { return "TEST"; } - - WorkerReplicationRequest::Ptr createReplicationRequest( - string const& worker, string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, unsigned int requestExpirationIvalSec, - ProtocolRequestReplicate const& request) const final { - return WorkerReplicationRequest::create(_serviceProvider, worker, id, priority, onExpired, - requestExpirationIvalSec, request); - } - - WorkerDeleteRequest::Ptr createDeleteRequest(string const& worker, string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, - unsigned int requestExpirationIvalSec, - ProtocolRequestDelete const& request) const final { - return WorkerDeleteRequest::create(_serviceProvider, worker, id, priority, onExpired, - requestExpirationIvalSec, request); - } - - WorkerFindRequest::Ptr createFindRequest(string const& worker, string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, - unsigned int requestExpirationIvalSec, - ProtocolRequestFind const& request) const final { - return WorkerFindRequest::create(_serviceProvider, worker, id, priority, onExpired, - requestExpirationIvalSec, request); - } - - WorkerFindAllRequest::Ptr createFindAllRequest(string const& worker, string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, - unsigned int requestExpirationIvalSec, - ProtocolRequestFindAll const& request) const final { - return WorkerFindAllRequest::create(_serviceProvider, worker, id, priority, onExpired, - requestExpirationIvalSec, request); - } - - WorkerEchoRequest::Ptr createEchoRequest(string const& worker, string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, - unsigned int requestExpirationIvalSec, - ProtocolRequestEcho const& request) const final { - return WorkerEchoRequest::create(_serviceProvider, worker, id, priority, onExpired, - requestExpirationIvalSec, request); - } - - WorkerSqlRequest::Ptr createSqlRequest(string const& worker, string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, - unsigned int requestExpirationIvalSec, - ProtocolRequestSql const& request) const final { - return WorkerSqlRequest::create(_serviceProvider, worker, id, priority, onExpired, - requestExpirationIvalSec, request); - } - - WorkerDirectorIndexRequest::Ptr createDirectorIndexRequest( - string const& worker, string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, unsigned int requestExpirationIvalSec, - ProtocolRequestDirectorIndex const& request) const final { - return WorkerDirectorIndexRequest::create(_serviceProvider, _connectionPool, worker, id, priority, - onExpired, requestExpirationIvalSec, request); - } -}; - -//////////////////////////////////////////////////////////////////// -///////////////////// WorkerRequestFactoryPOSIX //////////////////// -//////////////////////////////////////////////////////////////////// - -/** - * Class WorkerRequestFactoryPOSIX creates request objects based on the direct - * manipulation of files on a POSIX file system. - */ -class WorkerRequestFactoryPOSIX : public WorkerRequestFactoryBase { -public: - WorkerRequestFactoryPOSIX() = delete; - WorkerRequestFactoryPOSIX(WorkerRequestFactoryPOSIX const&) = delete; - WorkerRequestFactoryPOSIX& operator=(WorkerRequestFactoryPOSIX const&) = delete; - - WorkerRequestFactoryPOSIX(ServiceProvider::Ptr const& serviceProvider, - database::mysql::ConnectionPool::Ptr const& connectionPool) - : WorkerRequestFactoryBase(serviceProvider, connectionPool) {} - - ~WorkerRequestFactoryPOSIX() final = default; - - string technology() const final { return "POSIX"; } - - WorkerReplicationRequest::Ptr createReplicationRequest( - string const& worker, string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, unsigned int requestExpirationIvalSec, - ProtocolRequestReplicate const& request) const final { - return WorkerReplicationRequestPOSIX::create(_serviceProvider, worker, id, priority, onExpired, - requestExpirationIvalSec, request); - } - - WorkerDeleteRequest::Ptr createDeleteRequest(string const& worker, string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, - unsigned int requestExpirationIvalSec, - ProtocolRequestDelete const& request) const final { - return WorkerDeleteRequestPOSIX::create(_serviceProvider, worker, id, priority, onExpired, - requestExpirationIvalSec, request); - } - - WorkerFindRequest::Ptr createFindRequest(string const& worker, string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, - unsigned int requestExpirationIvalSec, - ProtocolRequestFind const& request) const final { - return WorkerFindRequestPOSIX::create(_serviceProvider, worker, id, priority, onExpired, - requestExpirationIvalSec, request); - } - - WorkerFindAllRequest::Ptr createFindAllRequest(string const& worker, string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, - unsigned int requestExpirationIvalSec, - ProtocolRequestFindAll const& request) const final { - return WorkerFindAllRequestPOSIX::create(_serviceProvider, worker, id, priority, onExpired, - requestExpirationIvalSec, request); - } - - WorkerEchoRequest::Ptr createEchoRequest(string const& worker, string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, - unsigned int requestExpirationIvalSec, - ProtocolRequestEcho const& request) const final { - return WorkerEchoRequestPOSIX::create(_serviceProvider, worker, id, priority, onExpired, - requestExpirationIvalSec, request); - } - - WorkerSqlRequest::Ptr createSqlRequest(string const& worker, string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, - unsigned int requestExpirationIvalSec, - ProtocolRequestSql const& request) const final { - return WorkerSqlRequestPOSIX::create(_serviceProvider, worker, id, priority, onExpired, - requestExpirationIvalSec, request); - } - - WorkerDirectorIndexRequest::Ptr createDirectorIndexRequest( - string const& worker, string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, unsigned int requestExpirationIvalSec, - ProtocolRequestDirectorIndex const& request) const final { - return WorkerDirectorIndexRequestPOSIX::create(_serviceProvider, _connectionPool, worker, id, - priority, onExpired, requestExpirationIvalSec, - request); - } -}; - -///////////////////////////////////////////////////////////////// -///////////////////// WorkerRequestFactoryFS //////////////////// -///////////////////////////////////////////////////////////////// - -/** - * Class WorkerRequestFactoryFS creates request objects based on the direct - * manipulation of local files on a POSIX file system and for reading remote - * files using the built-into-worker simple file server. - */ -class WorkerRequestFactoryFS : public WorkerRequestFactoryBase { -public: - WorkerRequestFactoryFS() = delete; - WorkerRequestFactoryFS(WorkerRequestFactoryFS const&) = delete; - WorkerRequestFactoryFS& operator=(WorkerRequestFactoryFS const&) = delete; - - WorkerRequestFactoryFS(ServiceProvider::Ptr const& serviceProvider, - database::mysql::ConnectionPool::Ptr const& connectionPool) - : WorkerRequestFactoryBase(serviceProvider, connectionPool) {} - - ~WorkerRequestFactoryFS() final = default; - - string technology() const final { return "FS"; } - - WorkerReplicationRequest::Ptr createReplicationRequest( - string const& worker, string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, unsigned int requestExpirationIvalSec, - ProtocolRequestReplicate const& request) const final { - return WorkerReplicationRequestFS::create(_serviceProvider, worker, id, priority, onExpired, - requestExpirationIvalSec, request); - } - - WorkerDeleteRequest::Ptr createDeleteRequest(string const& worker, string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, - unsigned int requestExpirationIvalSec, - ProtocolRequestDelete const& request) const final { - return WorkerDeleteRequestFS::create(_serviceProvider, worker, id, priority, onExpired, - requestExpirationIvalSec, request); - } - - WorkerFindRequest::Ptr createFindRequest(string const& worker, string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, - unsigned int requestExpirationIvalSec, - ProtocolRequestFind const& request) const final { - return WorkerFindRequestFS::create(_serviceProvider, worker, id, priority, onExpired, - requestExpirationIvalSec, request); - } - - WorkerFindAllRequest::Ptr createFindAllRequest(string const& worker, string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, - unsigned int requestExpirationIvalSec, - ProtocolRequestFindAll const& request) const final { - return WorkerFindAllRequestFS::create(_serviceProvider, worker, id, priority, onExpired, - requestExpirationIvalSec, request); - } - - WorkerEchoRequest::Ptr createEchoRequest(string const& worker, string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, - unsigned int requestExpirationIvalSec, - ProtocolRequestEcho const& request) const final { - return WorkerEchoRequestFS::create(_serviceProvider, worker, id, priority, onExpired, - requestExpirationIvalSec, request); - } - - WorkerSqlRequest::Ptr createSqlRequest(string const& worker, string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, - unsigned int requestExpirationIvalSec, - ProtocolRequestSql const& request) const final { - return WorkerSqlRequestFS::create(_serviceProvider, worker, id, priority, onExpired, - requestExpirationIvalSec, request); - } - - WorkerDirectorIndexRequest::Ptr createDirectorIndexRequest( - string const& worker, string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, unsigned int requestExpirationIvalSec, - ProtocolRequestDirectorIndex const& request) const final { - return WorkerDirectorIndexRequestFS::create(_serviceProvider, _connectionPool, worker, id, priority, - onExpired, requestExpirationIvalSec, request); - } -}; - -/////////////////////////////////////////////////////////////// -///////////////////// WorkerRequestFactory //////////////////// -/////////////////////////////////////////////////////////////// - -WorkerRequestFactory::WorkerRequestFactory(ServiceProvider::Ptr const& serviceProvider, - database::mysql::ConnectionPool::Ptr const& connectionPool, - string const& technology) - : WorkerRequestFactoryBase(serviceProvider, connectionPool) { - string const finalTechnology = - technology.empty() ? serviceProvider->config()->get("worker", "technology") : technology; - - if (finalTechnology == "TEST") - _ptr = new WorkerRequestFactoryTest(serviceProvider, connectionPool); - else if (finalTechnology == "POSIX") - _ptr = new WorkerRequestFactoryPOSIX(serviceProvider, connectionPool); - else if (finalTechnology == "FS") - _ptr = new WorkerRequestFactoryFS(serviceProvider, connectionPool); - else { - throw invalid_argument("WorkerRequestFactory::" + string(__func__) + " unknown technology: '" + - finalTechnology); - } -} - -WorkerReplicationRequest::Ptr WorkerRequestFactory::createReplicationRequest( - string const& worker, string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, unsigned int requestExpirationIvalSec, - ProtocolRequestReplicate const& request) const { - auto ptr = _ptr->createReplicationRequest(worker, id, priority, onExpired, requestExpirationIvalSec, - request); - ptr->init(); - return ptr; -} - -WorkerDeleteRequest::Ptr WorkerRequestFactory::createDeleteRequest( - string const& worker, string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, unsigned int requestExpirationIvalSec, - ProtocolRequestDelete const& request) const { - auto ptr = _ptr->createDeleteRequest(worker, id, priority, onExpired, requestExpirationIvalSec, request); - ptr->init(); - return ptr; -} - -WorkerFindRequest::Ptr WorkerRequestFactory::createFindRequest( - string const& worker, string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, unsigned int requestExpirationIvalSec, - ProtocolRequestFind const& request) const { - auto ptr = _ptr->createFindRequest(worker, id, priority, onExpired, requestExpirationIvalSec, request); - ptr->init(); - return ptr; -} - -WorkerFindAllRequest::Ptr WorkerRequestFactory::createFindAllRequest( - string const& worker, string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, unsigned int requestExpirationIvalSec, - ProtocolRequestFindAll const& request) const { - auto ptr = _ptr->createFindAllRequest(worker, id, priority, onExpired, requestExpirationIvalSec, request); - ptr->init(); - return ptr; -} - -WorkerEchoRequest::Ptr WorkerRequestFactory::createEchoRequest( - string const& worker, string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, unsigned int requestExpirationIvalSec, - ProtocolRequestEcho const& request) const { - auto ptr = _ptr->createEchoRequest(worker, id, priority, onExpired, requestExpirationIvalSec, request); - ptr->init(); - return ptr; -} - -WorkerSqlRequest::Ptr WorkerRequestFactory::createSqlRequest( - string const& worker, string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, unsigned int requestExpirationIvalSec, - ProtocolRequestSql const& request) const { - auto ptr = _ptr->createSqlRequest(worker, id, priority, onExpired, requestExpirationIvalSec, request); - ptr->init(); - return ptr; -} - -WorkerDirectorIndexRequest::Ptr WorkerRequestFactory::createDirectorIndexRequest( - string const& worker, string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, unsigned int requestExpirationIvalSec, - ProtocolRequestDirectorIndex const& request) const { - auto ptr = _ptr->createDirectorIndexRequest(worker, id, priority, onExpired, requestExpirationIvalSec, - request); - ptr->init(); - return ptr; -} - -} // namespace lsst::qserv::replica diff --git a/src/replica/worker/WorkerRequestFactory.h b/src/replica/worker/WorkerRequestFactory.h deleted file mode 100644 index 0b254a8f8..000000000 --- a/src/replica/worker/WorkerRequestFactory.h +++ /dev/null @@ -1,206 +0,0 @@ -/* - * LSST Data Management System - * - * This product includes software developed by the - * LSST Project (http://www.lsst.org/). - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the LSST License Statement and - * the GNU General Public License along with this program. If not, - * see . - */ -#ifndef LSST_QSERV_REPLICA_WORKERREQUESTFACTORY_H -#define LSST_QSERV_REPLICA_WORKERREQUESTFACTORY_H - -// System headers -#include -#include -#include - -// Qserv headers -#include "replica/services/ServiceProvider.h" -#include "replica/worker/WorkerRequest.h" - -// Forward declarations -namespace lsst::qserv::replica { -class ProtocolRequestDirectorIndex; -class ProtocolRequestSql; -class WorkerDeleteRequest; -class WorkerEchoRequest; -class WorkerFindAllRequest; -class WorkerFindRequest; -class WorkerDirectorIndexRequest; -class WorkerSqlRequest; -class WorkerReplicationRequest; -namespace database::mysql { -class ConnectionPool; -} // namespace database::mysql -} // namespace lsst::qserv::replica - -// This header declarations -namespace lsst::qserv::replica { - -/** - * Class WorkerRequestFactoryBase is an abstract base class for a family of - * various implementations of factories for creating request objects. - */ -class WorkerRequestFactoryBase { -public: - WorkerRequestFactoryBase() = delete; - WorkerRequestFactoryBase(WorkerRequestFactoryBase const&) = delete; - WorkerRequestFactoryBase& operator=(WorkerRequestFactoryBase const&) = delete; - - virtual ~WorkerRequestFactoryBase() = default; - - /// @return the name of a technology the factory is based upon - virtual std::string technology() const = 0; - - /// @see class WorkerReplicationRequest - virtual std::shared_ptr createReplicationRequest( - std::string const& worker, std::string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, unsigned int requestExpirationIvalSec, - ProtocolRequestReplicate const& request) const = 0; - - /// @see class WorkerDeleteRequest - virtual std::shared_ptr createDeleteRequest( - std::string const& worker, std::string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, unsigned int requestExpirationIvalSec, - ProtocolRequestDelete const& request) const = 0; - - /// @see class WorkerFindRequest - virtual std::shared_ptr createFindRequest( - std::string const& worker, std::string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, unsigned int requestExpirationIvalSec, - ProtocolRequestFind const& request) const = 0; - - /// @see class WorkerFindAllRequest - virtual std::shared_ptr createFindAllRequest( - std::string const& worker, std::string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, unsigned int requestExpirationIvalSec, - ProtocolRequestFindAll const& request) const = 0; - - /// @see class WorkerEchoRequest - virtual std::shared_ptr createEchoRequest( - std::string const& worker, std::string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, unsigned int requestExpirationIvalSec, - ProtocolRequestEcho const& request) const = 0; - - /// @see class WorkerSqlRequest - virtual std::shared_ptr createSqlRequest( - std::string const& worker, std::string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, unsigned int requestExpirationIvalSec, - ProtocolRequestSql const& request) const = 0; - - /// @see class WorkerDirectorIndexRequest - virtual std::shared_ptr createDirectorIndexRequest( - std::string const& worker, std::string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, unsigned int requestExpirationIvalSec, - ProtocolRequestDirectorIndex const& request) const = 0; - -protected: - /** - * @param serviceProvider a provider of various services - * @param connectionPool a pool of persistent database connections - */ - WorkerRequestFactoryBase(ServiceProvider::Ptr const& serviceProvider, - std::shared_ptr const& connectionPool); - -protected: - ServiceProvider::Ptr const _serviceProvider; - std::shared_ptr const _connectionPool; -}; - -/** - * Class WorkerRequestFactory is a proxy class which is constructed with - * a choice of a specific implementation of the factory. - */ -class WorkerRequestFactory : public WorkerRequestFactoryBase { -public: - WorkerRequestFactory() = delete; - WorkerRequestFactory(WorkerRequestFactory const&) = delete; - WorkerRequestFactory& operator=(WorkerRequestFactory const&) = delete; - - /** - * The constructor of the class. - * - * The technology name must be valid. Otherwise std::invalid_argument will - * be thrown. If the default value of the parameter is assumed then the one - * from the current configuration will be assumed. - * - * This is the list of technologies which are presently supported: - * - * 'TEST' - request objects which are meant to be used for testing the framework - * operation w/o making any persistent side effects. - * - * 'POSIX' - request objects based on the direct manipulation of files - * on a POSIX file system. - * - * 'FS' - request objects based on the direct manipulation of local files - * on a POSIX file system and for reading remote files using - * the built-into-worker simple file server. - * - * @param serviceProvider provider of various services (including configurations) - * @param connectionPool a pool of persistent database connections - * @param technology (optional) the name of a technology - */ - WorkerRequestFactory(ServiceProvider::Ptr const& serviceProvider, - std::shared_ptr const& connectionPool, - std::string const& technology = std::string()); - - ~WorkerRequestFactory() final { delete _ptr; } - - std::string technology() const final { return _ptr->technology(); } - - std::shared_ptr createReplicationRequest( - std::string const& worker, std::string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, unsigned int requestExpirationIvalSec, - ProtocolRequestReplicate const& request) const final; - - std::shared_ptr createDeleteRequest( - std::string const& worker, std::string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, unsigned int requestExpirationIvalSec, - ProtocolRequestDelete const& request) const final; - - std::shared_ptr createFindRequest( - std::string const& worker, std::string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, unsigned int requestExpirationIvalSec, - ProtocolRequestFind const& request) const final; - - std::shared_ptr createFindAllRequest( - std::string const& worker, std::string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, unsigned int requestExpirationIvalSec, - ProtocolRequestFindAll const& request) const final; - - std::shared_ptr createEchoRequest( - std::string const& worker, std::string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, unsigned int requestExpirationIvalSec, - ProtocolRequestEcho const& request) const final; - - std::shared_ptr createSqlRequest(std::string const& worker, std::string const& id, - int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, - unsigned int requestExpirationIvalSec, - ProtocolRequestSql const& request) const final; - - std::shared_ptr createDirectorIndexRequest( - std::string const& worker, std::string const& id, int priority, - WorkerRequest::ExpirationCallbackType const& onExpired, unsigned int requestExpirationIvalSec, - ProtocolRequestDirectorIndex const& request) const final; - -protected: - /// Pointer to the final implementation of the factory - WorkerRequestFactoryBase const* _ptr; -}; - -} // namespace lsst::qserv::replica - -#endif // LSST_QSERV_REPLICA_WORKERREQUESTFACTORY_H diff --git a/src/replica/worker/WorkerServer.cc b/src/replica/worker/WorkerServer.cc index 02a79dc93..7caa9c3d9 100644 --- a/src/replica/worker/WorkerServer.cc +++ b/src/replica/worker/WorkerServer.cc @@ -47,15 +47,14 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.WorkerServer"); namespace lsst::qserv::replica { WorkerServer::Ptr WorkerServer::create(ServiceProvider::Ptr const& serviceProvider, - WorkerRequestFactory& requestFactory, string const& workerName) { - return WorkerServer::Ptr(new WorkerServer(serviceProvider, requestFactory, workerName)); + string const& workerName) { + return WorkerServer::Ptr(new WorkerServer(serviceProvider, workerName)); } -WorkerServer::WorkerServer(ServiceProvider::Ptr const& serviceProvider, WorkerRequestFactory& requestFactory, - string const& workerName) +WorkerServer::WorkerServer(ServiceProvider::Ptr const& serviceProvider, string const& workerName) : _serviceProvider(serviceProvider), _workerName(workerName), - _processor(WorkerProcessor::create(serviceProvider, requestFactory, workerName)), + _processor(WorkerProcessor::create(serviceProvider, workerName)), _acceptor(_io_service, boost::asio::ip::tcp::endpoint( boost::asio::ip::tcp::v4(), serviceProvider->config()->get("worker", "svc-port"))) { diff --git a/src/replica/worker/WorkerServer.h b/src/replica/worker/WorkerServer.h index 0897fb10f..6960b09e6 100644 --- a/src/replica/worker/WorkerServer.h +++ b/src/replica/worker/WorkerServer.h @@ -32,11 +32,6 @@ #include "replica/worker/WorkerProcessor.h" #include "replica/worker/WorkerServerConnection.h" -// Forward declarations -namespace lsst::qserv::replica { -class WorkerRequestFactory; -} // namespace lsst::qserv::replica - // This header declarations namespace lsst::qserv::replica { @@ -59,18 +54,13 @@ class WorkerServer : public std::enable_shared_from_this { * provider is needed to access the Configuration of a setup * and for validating the input parameters * - * @param requestFactory - * the factory of requests which will be created by the server - * and forwarded to the request processor for actual execution. - * * @param workerName * the name of a worker this instance represents * * @return * pointer to the new object created by the factory */ - static Ptr create(ServiceProvider::Ptr const& serviceProvider, WorkerRequestFactory& requestFactory, - std::string const& workerName); + static Ptr create(ServiceProvider::Ptr const& serviceProvider, std::string const& workerName); // Default construction and copy semantics are prohibited @@ -104,8 +94,7 @@ class WorkerServer : public std::enable_shared_from_this { private: /// @see WorkerServer::create() - WorkerServer(ServiceProvider::Ptr const& serviceProvider, WorkerRequestFactory& requestFactory, - std::string const& workerName); + WorkerServer(ServiceProvider::Ptr const& serviceProvider, std::string const& workerName); /** * Begin (asynchronously) accepting connection requests. diff --git a/src/replica/worker/WorkerSqlRequest.cc b/src/replica/worker/WorkerSqlRequest.cc index 6e294f028..213e654a0 100644 --- a/src/replica/worker/WorkerSqlRequest.cc +++ b/src/replica/worker/WorkerSqlRequest.cc @@ -51,8 +51,10 @@ WorkerSqlRequest::Ptr WorkerSqlRequest::create(ServiceProvider::Ptr const& servi ExpirationCallbackType const& onExpired, unsigned int requestExpirationIvalSec, ProtocolRequestSql const& request) { - return WorkerSqlRequest::Ptr(new WorkerSqlRequest(serviceProvider, worker, id, priority, onExpired, - requestExpirationIvalSec, request)); + auto ptr = WorkerSqlRequest::Ptr(new WorkerSqlRequest(serviceProvider, worker, id, priority, onExpired, + requestExpirationIvalSec, request)); + ptr->init(); + return ptr; } WorkerSqlRequest::WorkerSqlRequest(ServiceProvider::Ptr const& serviceProvider, string const& worker, @@ -83,18 +85,10 @@ void WorkerSqlRequest::setInfo(ProtocolResponseSql& response) const { bool WorkerSqlRequest::execute() { string const context_ = "WorkerSqlRequest::" + context(__func__); LOGS(_log, LOG_LVL_DEBUG, context_); + replica::Lock lock(_mtx, context_); + checkIfCancelling(lock, __func__); - switch (status()) { - case ProtocolStatus::IN_PROGRESS: - break; - case ProtocolStatus::IS_CANCELLING: - setStatus(lock, ProtocolStatus::CANCELLED); - throw WorkerRequestCancelled(); - default: - throw logic_error(context_ + - " not allowed while in state: " + WorkerRequest::status2string(status())); - } try { // Pre-create the default result-set message before any operations with // the database service. This is needed to report errors. diff --git a/src/replica/worker/WorkerSqlRequest.h b/src/replica/worker/WorkerSqlRequest.h index eb7afcc16..3df4b65d5 100644 --- a/src/replica/worker/WorkerSqlRequest.h +++ b/src/replica/worker/WorkerSqlRequest.h @@ -161,12 +161,6 @@ class WorkerSqlRequest : public WorkerRequest { mutable ProtocolResponseSql _response; }; -/// Class WorkerSqlRequest provides an actual implementation -typedef WorkerSqlRequest WorkerSqlRequestFS; - -/// Class WorkerSqlRequest provides an actual implementation -typedef WorkerSqlRequest WorkerSqlRequestPOSIX; - } // namespace lsst::qserv::replica #endif // LSST_QSERV_REPLICA_WORKERSQLREQUEST_H