From 176fc2d754a94c070a8e4555164a661d0ee2cea3 Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Fri, 1 Nov 2024 13:17:52 -0700 Subject: [PATCH 1/6] Incremented the version number of the REST API to 39 --- src/admin/python/lsst/qserv/admin/replicationInterface.py | 2 +- src/http/ChttpMetaModule.cc | 2 +- src/http/MetaModule.cc | 2 +- src/www/qserv/js/Common.js | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/admin/python/lsst/qserv/admin/replicationInterface.py b/src/admin/python/lsst/qserv/admin/replicationInterface.py index 8004d74c4..369e9f0dd 100644 --- a/src/admin/python/lsst/qserv/admin/replicationInterface.py +++ b/src/admin/python/lsst/qserv/admin/replicationInterface.py @@ -230,7 +230,7 @@ def __init__( self.repl_ctrl = urlparse(repl_ctrl_uri) self.auth_key = auth_key self.admin_auth_key = admin_auth_key - self.repl_api_version = 38 + self.repl_api_version = 39 _log.debug(f"ReplicationInterface %s", self.repl_ctrl) def version(self) -> str: diff --git a/src/http/ChttpMetaModule.cc b/src/http/ChttpMetaModule.cc index 5a592b2c6..7495fe206 100644 --- a/src/http/ChttpMetaModule.cc +++ b/src/http/ChttpMetaModule.cc @@ -37,7 +37,7 @@ string const adminAuthKey; namespace lsst::qserv::http { -unsigned int const ChttpMetaModule::version = 38; +unsigned int const ChttpMetaModule::version = 39; void ChttpMetaModule::process(string const& context, nlohmann::json const& info, httplib::Request const& req, httplib::Response& resp, string const& subModuleName) { diff --git a/src/http/MetaModule.cc b/src/http/MetaModule.cc index f9f0be36c..2c02d2a23 100644 --- a/src/http/MetaModule.cc +++ b/src/http/MetaModule.cc @@ -37,7 +37,7 @@ string const adminAuthKey; namespace lsst::qserv::http { -unsigned int const MetaModule::version = 38; +unsigned int const MetaModule::version = 39; void MetaModule::process(string const& context, nlohmann::json const& info, shared_ptr const& req, shared_ptr const& resp, diff --git a/src/www/qserv/js/Common.js b/src/www/qserv/js/Common.js index 70c1d7c0f..024153425 100644 --- a/src/www/qserv/js/Common.js +++ b/src/www/qserv/js/Common.js @@ -6,7 +6,7 @@ function(sqlFormatter, _) { class Common { - static RestAPIVersion = 38; + static RestAPIVersion = 39; static query2text(query, expanded) { if (expanded) { if (query.length > Common._max_expanded_length) { From 998cf850fda20d688fb31d08668765a2576ea4b8 Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Mon, 4 Nov 2024 15:51:09 -0800 Subject: [PATCH 2/6] Added the post-itest log grabber for the HTTP frontend in GHA CI --- .github/workflows/ci.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9d2b196e7..e8afa81b1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -356,6 +356,10 @@ jobs: if: always() run: docker logs ${USER}_czar_proxy_1 + - name: Czar HTTP Frontend Log + if: always() + run: docker logs ${USER}_czar_http_1 + - name: Czar CMSD Log if: always() run: docker logs ${USER}_czar_cmsd_1 From 093d95ee2dc177322a32c2eacb76584844955fed Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Fri, 1 Nov 2024 17:18:16 -0700 Subject: [PATCH 3/6] Extended http Client class to allow posting multipart/post-data formatted requests The new version of the class allows sending file attachments in the streaming mode, along with parts. --- src/http/AsyncReq.cc | 1 + src/http/Client.cc | 78 +++++++++++++++++++++++++++++++++++--------- src/http/Client.h | 45 +++++++++++++++++++++++++ src/http/Method.cc | 4 +++ src/http/Method.h | 4 +-- 5 files changed, 115 insertions(+), 17 deletions(-) diff --git a/src/http/AsyncReq.cc b/src/http/AsyncReq.cc index 611ed8f79..5bbec4c28 100644 --- a/src/http/AsyncReq.cc +++ b/src/http/AsyncReq.cc @@ -50,6 +50,7 @@ boost::beast::http::verb method2verb(lsst::qserv::http::Method method) { case lsst::qserv::http::Method::GET: return boost::beast::http::verb::get; case lsst::qserv::http::Method::POST: + case lsst::qserv::http::Method::MIMEPOST: return boost::beast::http::verb::post; case lsst::qserv::http::Method::PUT: return boost::beast::http::verb::put; diff --git a/src/http/Client.cc b/src/http/Client.cc index 7526efa93..1f4e2c690 100644 --- a/src/http/Client.cc +++ b/src/http/Client.cc @@ -52,6 +52,22 @@ Client::Client(http::Method method, string const& url, string const& data, vecto _headers(headers), _clientConfig(clientConfig), _connPool(connPool) { + if (_method == http::Method::MIMEPOST) { + throw invalid_argument("http::Client::" + string(__func__) + + ": method MIMEPOST is not allowed in this constructor"); + } + _hcurl = curl_easy_init(); + assert(_hcurl != nullptr); +} + +Client::Client(string const& url, list const& mimeData, vector const& headers, + ClientConfig const& clientConfig, shared_ptr const& connPool) + : _method(http::Method::MIMEPOST), + _url(url), + _mimeData(mimeData), + _headers(headers), + _clientConfig(clientConfig), + _connPool(connPool) { _hcurl = curl_easy_init(); assert(_hcurl != nullptr); } @@ -59,11 +75,13 @@ Client::Client(http::Method method, string const& url, string const& data, vecto Client::~Client() { curl_slist_free_all(_hlist); curl_easy_cleanup(_hcurl); + curl_mime_free(_form); } void Client::read(CallbackType const& onDataRead) { + string const context = "http::Client::" + string(__func__) + ": "; if (onDataRead == nullptr) { - throw invalid_argument("http::Client::" + string(__func__) + ": no callback provided"); + throw invalid_argument(context + "no callback provided"); } _onDataRead = onDataRead; @@ -75,21 +93,51 @@ void Client::read(CallbackType const& onDataRead) { curl_easy_setopt(_hcurl, CURLOPT_URL, _url.c_str())); _curlEasyErrorChecked("curl_easy_setopt(CURLOPT_CUSTOMREQUEST)", curl_easy_setopt(_hcurl, CURLOPT_CUSTOMREQUEST, nullptr)); - if (_method == http::Method::GET) { - _curlEasyErrorChecked("curl_easy_setopt(CURLOPT_HTTPGET)", - curl_easy_setopt(_hcurl, CURLOPT_HTTPGET, 1L)); - } else if (_method == http::Method::POST) { - _curlEasyErrorChecked("curl_easy_setopt(CURLOPT_POST)", curl_easy_setopt(_hcurl, CURLOPT_POST, 1L)); + if (_method == http::Method::MIMEPOST) { + if (_mimeData.empty()) { + throw invalid_argument(context + "no data provided for MIMEPOST"); + } + curl_mime_free(_form); + _form = curl_mime_init(_hcurl); + _curlEasyErrorChecked("curl_easy_setopt(CURLOPT_MIMEPOST)", + curl_easy_setopt(_hcurl, CURLOPT_MIMEPOST, _form)); + for (auto const& mimeEntry : _mimeData) { + curl_mimepart* field = curl_mime_addpart(_form); + _curlEasyErrorChecked("curl_mime_name", curl_mime_name(field, mimeEntry.name.c_str())); + if (mimeEntry.value.empty() == mimeEntry.filename.empty()) { + throw invalid_argument(context + "invalid JSON data provided for MIMEPOST"); + } + if (!mimeEntry.value.empty()) { + _curlEasyErrorChecked("curl_mime_data", + curl_mime_data(field, mimeEntry.value.c_str(), CURL_ZERO_TERMINATED)); + } else { + _curlEasyErrorChecked("curl_mime_filename", + curl_mime_filename(field, mimeEntry.filename.c_str())); + _curlEasyErrorChecked("curl_mime_filedata", + curl_mime_filedata(field, mimeEntry.filename.c_str())); + } + if (!mimeEntry.type.empty()) { + _curlEasyErrorChecked("curl_mime_type", curl_mime_type(field, mimeEntry.type.c_str())); + } + } } else { - _curlEasyErrorChecked( - "curl_easy_setopt(CURLOPT_CUSTOMREQUEST)", - curl_easy_setopt(_hcurl, CURLOPT_CUSTOMREQUEST, http::method2string(_method).data())); - } - if (!_data.empty()) { - _curlEasyErrorChecked("curl_easy_setopt(CURLOPT_POSTFIELDS)", - curl_easy_setopt(_hcurl, CURLOPT_POSTFIELDS, _data.c_str())); - _curlEasyErrorChecked("curl_easy_setopt(CURLOPT_POSTFIELDSIZE)", - curl_easy_setopt(_hcurl, CURLOPT_POSTFIELDSIZE, _data.size())); + if (_method == http::Method::GET) { + _curlEasyErrorChecked("curl_easy_setopt(CURLOPT_HTTPGET)", + curl_easy_setopt(_hcurl, CURLOPT_HTTPGET, 1L)); + } else if (_method == http::Method::POST) { + _curlEasyErrorChecked("curl_easy_setopt(CURLOPT_POST)", + curl_easy_setopt(_hcurl, CURLOPT_POST, 1L)); + } else { + _curlEasyErrorChecked( + "curl_easy_setopt(CURLOPT_CUSTOMREQUEST)", + curl_easy_setopt(_hcurl, CURLOPT_CUSTOMREQUEST, http::method2string(_method).data())); + } + if (!_data.empty()) { + _curlEasyErrorChecked("curl_easy_setopt(CURLOPT_POSTFIELDS)", + curl_easy_setopt(_hcurl, CURLOPT_POSTFIELDS, _data.c_str())); + _curlEasyErrorChecked("curl_easy_setopt(CURLOPT_POSTFIELDSIZE)", + curl_easy_setopt(_hcurl, CURLOPT_POSTFIELDSIZE, _data.size())); + } } curl_slist_free_all(_hlist); _hlist = nullptr; diff --git a/src/http/Client.h b/src/http/Client.h index 6d3dc8162..88c5cd80d 100644 --- a/src/http/Client.h +++ b/src/http/Client.h @@ -23,6 +23,7 @@ // System headers #include +#include #include #include #include @@ -43,6 +44,16 @@ class ClientConnPool; // This header declarations namespace lsst::qserv::http { +/** + * The structure represents a single entry in the MIMEPOST request. + */ +struct ClientMimeEntry { + std::string name; ///< The required name of the form field. + std::string value; ///< The optional value of the form field. + std::string filename; ///< The optional name of the file to be uploaded. + std::string type; ///< The optional MIME type of the file. +}; + /** * Class Client is a simple interface for communicating over the HTTP protocol. * The implementation of the class invokes a user-supplied callback (lambda) function for @@ -84,6 +95,10 @@ class Client { ~Client(); /** + * Create a client object for communicating with a remote service. + * + * @note This form of the object construction is not allowed to be called with + the method 'http::Method::MIMEPOST'. * @param method An HTTP method. * @param url A location of the remote resoure. * @param data Optional data to be sent with a request (depends on the HTTP headers). @@ -96,6 +111,34 @@ class Client { ClientConfig const& clientConfig = ClientConfig(), std::shared_ptr const& connPool = nullptr); + /** + * Create a client object for sending a http::method::MIMEPOST request to a remote service. + * + * The body of the request will be in the 'multipart/form-data' format. A description of + * the body is provided in the 'mimeData' parameter. Here is an example illustrating how to + * use the method for uploading a file to a remote service: + * @code + * std::list const mimeData = { + * {"database", "lsst_dr01", "", ""}, + * {"table", "Object", "", ""}, + * {"schema", R"({"column":"object_id", "type": "BIGINT"})", "", "application/json"}, + * {"rows", "", "/tmp/chunk_123.txt", "text/plain"} + * }; + * http::Client client("https://qserv-int.lsst.net/ingest/chunk", mimeData); + * @code + * @note There is no need to specify the header {"Content-Type: multipart/form-data"} as it + * is added automatically by the method. + * @param url A location of the remote resoure. + * @param data A descriptor of the data to be sent in the 'multipart/form-data' body. + * @param headers Optional HTTP headers to be send with a request. + * @param clientConfig Optional configuration parameters of the reader. + * @param connPool Optional connection pool + */ + Client(std::string const& url, std::list const& mimeData, + std::vector const& headers = std::vector(), + ClientConfig const& clientConfig = ClientConfig(), + std::shared_ptr const& connPool = nullptr); + /** * Begin processing a request. The whole content of the remote data source * refferred to by a URL passed into the constructor will be read. A callback @@ -172,6 +215,7 @@ class Client { http::Method const _method; std::string const _url; std::string const _data; + std::list const _mimeData; std::vector const _headers; ClientConfig const _clientConfig; std::shared_ptr const _connPool; @@ -181,6 +225,7 @@ class Client { // Cached members CURL* _hcurl = nullptr; curl_slist* _hlist = nullptr; + curl_mime* _form = nullptr; }; } // namespace lsst::qserv::http diff --git a/src/http/Method.cc b/src/http/Method.cc index b6937307a..bb8653c06 100644 --- a/src/http/Method.cc +++ b/src/http/Method.cc @@ -39,6 +39,8 @@ string method2string(Method method) { return "PUT"; case Method::DELETE: return "DELETE"; + case Method::MIMEPOST: + return "MIMEPOST"; } throw invalid_argument("http::" + string(__func__) + " invalid method " + to_string(static_cast(method))); @@ -53,6 +55,8 @@ Method string2method(string const& str) { return Method::PUT; else if ("DELETE" == str) return Method::DELETE; + else if ("MIMEPOST" == str) + return Method::MIMEPOST; throw invalid_argument("http::" + string(__func__) + " invalid method " + str); } diff --git a/src/http/Method.h b/src/http/Method.h index 8b02f9c7e..02302f0be 100644 --- a/src/http/Method.h +++ b/src/http/Method.h @@ -30,10 +30,10 @@ namespace lsst::qserv::http { /// The names of the allowed methods. -static std::vector const allowedMethods = {"GET", "POST", "PUT", "DELETE"}; +static std::vector const allowedMethods = {"GET", "POST", "PUT", "DELETE", "MIMEPOST"}; /// The type-safe representation of the HTTP methods. -enum class Method : int { GET, POST, PUT, DELETE }; +enum class Method : int { GET, POST, PUT, DELETE, MIMEPOST }; /// @return The string representation. /// @throws std::invalid_argument If the method is not valid. From 5e4d4087139a8588f4ef30ff2226485e90c16d75 Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Thu, 31 Oct 2024 18:33:10 -0700 Subject: [PATCH 4/6] Refactored the Czar REST service for ingesting user tables The rafactoring prepares ground for introducing another implementation of the REST API for ingesting tables in the "multi-part/form" body. The new intermediate base class will prevent code duplication after adding the second module. --- src/czar/CMakeLists.txt | 1 + src/czar/HttpCzarIngestModule.cc | 415 ++++----------------------- src/czar/HttpCzarIngestModule.h | 150 +--------- src/czar/HttpCzarIngestModuleBase.cc | 350 ++++++++++++++++++++++ src/czar/HttpCzarIngestModuleBase.h | 275 ++++++++++++++++++ 5 files changed, 691 insertions(+), 500 deletions(-) create mode 100644 src/czar/HttpCzarIngestModuleBase.cc create mode 100644 src/czar/HttpCzarIngestModuleBase.h diff --git a/src/czar/CMakeLists.txt b/src/czar/CMakeLists.txt index 4015913c7..719ed4931 100644 --- a/src/czar/CMakeLists.txt +++ b/src/czar/CMakeLists.txt @@ -3,6 +3,7 @@ add_library(czar OBJECT) target_sources(czar PRIVATE ChttpModule.cc Czar.cc + HttpCzarIngestModuleBase.cc HttpCzarIngestModule.cc HttpCzarQueryModule.cc HttpCzarSvc.cc diff --git a/src/czar/HttpCzarIngestModule.cc b/src/czar/HttpCzarIngestModule.cc index a0443d6a2..61c7904b8 100644 --- a/src/czar/HttpCzarIngestModule.cc +++ b/src/czar/HttpCzarIngestModule.cc @@ -28,15 +28,11 @@ #include #include -// Third party headers -#include "boost/algorithm/string.hpp" - // Qserv headers #include "cconfig/CzarConfig.h" #include "http/AsyncReq.h" #include "http/BinaryEncoding.h" #include "http/Exceptions.h" -#include "http/MetaModule.h" #include "http/RequestBodyJSON.h" #include "qhttp/Status.h" @@ -46,62 +42,6 @@ namespace cconfig = lsst::qserv::cconfig; namespace http = lsst::qserv::http; using json = nlohmann::json; -namespace { - -/// Database names provided by users must start with this prefix. -string const userDatabaseNamesPrefix = "user_"; - -/// @throw http::Error if the name is too short, or it doesn't start with the required prefix. -void verifyUserDatabaseName(string const& func, string const& databaseName) { - if ((databaseName.size() <= userDatabaseNamesPrefix.size()) || - !boost::iequals(databaseName.substr(0, userDatabaseNamesPrefix.size()), userDatabaseNamesPrefix)) { - auto err = "database name doesn't start with the prefix: " + userDatabaseNamesPrefix; - throw http::Error(func, err); - } -} - -/// Table names provided by users can not start with the Qserv-specific prefix. -string const qservTableNamesPrefix = "qserv"; - -/// @throw http::Error if the name is empty or starts with the reserved prefix. -void verifyUserTableName(string const& func, string const& tableName) { - if (tableName.empty()) throw http::Error(func, "table name is empty"); - if (boost::iequals(tableName.substr(0, qservTableNamesPrefix.size()), qservTableNamesPrefix)) { - auto err = "table name starts with the reserved prefix: " + qservTableNamesPrefix; - throw http::Error(func, err); - } -} - -size_t countDirectors(const json& database) { - size_t numDirectors = 0; - for (const auto& table : database.at("tables")) { - if (table.at("is_director").get() != 0) { - ++numDirectors; - } - } - return numDirectors; -} - -// These parameters correspond to the present partitioning model of 150k chunks. -// In reality the parameters are not needed for ingesting regular tables which is -// is the only kind of tables supported by the current implementation of the module. -// Some value of the parameters are still required by the Replication/Ingest system's API. -unsigned int defaultNumStripes = 340; -unsigned int defaultNumSubStripes = 3; -float defaultOverlap = 0.01667; - -string const defaultDirectorTableName = "qserv_director"; -unsigned int const defaultChunkId = 0; - -void setProtocolFields(json& data) { - data["version"] = http::MetaModule::version; - data["instance_id"] = cconfig::CzarConfig::instance()->replicationInstanceId(); - data["auth_key"] = cconfig::CzarConfig::instance()->replicationAuthKey(); - data["admin_auth_key"] = cconfig::CzarConfig::instance()->replicationAdminAuthKey(); -} - -} // namespace - namespace lsst::qserv::czar { void HttpCzarIngestModule::process(asio::io_service& io_service, string const& context, @@ -115,10 +55,8 @@ HttpCzarIngestModule::HttpCzarIngestModule(asio::io_service& io_service, string httplib::Request const& req, httplib::Response& resp) : http::ChttpModule(cconfig::CzarConfig::instance()->replicationAuthKey(), cconfig::CzarConfig::instance()->replicationAdminAuthKey(), req, resp), - _io_service(io_service), - _context(context), - _registryBaseUrl("http://" + cconfig::CzarConfig::instance()->replicationRegistryHost() + ":" + - to_string(cconfig::CzarConfig::instance()->replicationRegistryPort())) {} + HttpCzarIngestModuleBase(io_service), + _context(context) {} string HttpCzarIngestModule::context() const { return _context; } @@ -140,7 +78,7 @@ json HttpCzarIngestModule::_ingestData() { auto const databaseName = body().required("database"); auto const tableName = body().required("table"); - _timeoutSec = max(1U, body().optional("timeout", _timeoutSec)); + setTimeoutSec(max(1U, body().optional("timeout", timeoutSec()))); // This is needed for decoding values of the binary columns should they be present // in the table schema. @@ -150,10 +88,10 @@ json HttpCzarIngestModule::_ingestData() { debug(__func__, "database: '" + databaseName + "'"); debug(__func__, "table: '" + tableName + "'"); debug(__func__, "binary_encoding: '" + http::binaryEncoding2string(binaryEncodingMode) + "'"); - debug(__func__, "timeout: " + to_string(_timeoutSec)); + debug(__func__, "timeout: " + to_string(timeoutSec())); - ::verifyUserDatabaseName(__func__, databaseName); - ::verifyUserTableName(__func__, tableName); + verifyUserDatabaseName(__func__, databaseName); + verifyUserTableName(__func__, tableName); // Table schema is required to be an array of column descriptors if (!body().has("schema")) { @@ -188,74 +126,55 @@ json HttpCzarIngestModule::_ingestData() { } } - // Begin making changes in the persistent state of Qserv and the Replicaton/Ingest system - - _unpublishOrCreateDatabase(databaseName); - _createTable(databaseName, tableName, schema); - - uint32_t transactionId = 0; - try { - transactionId = _startTransaction(databaseName); - } catch (exception const& ex) { - _deleteTable(databaseName, tableName); - throw; - } - - // Send table data to all eligible workers and wait for the responses. - // Note that requests are sent in parallel, and the duration of each such request - // is limited by the timeout parameter. - json dataJson = json::object({{"transaction_id", transactionId}, - {"table", tableName}, - {"chunk", 0}, - {"overlap", 0}, - {"rows", rows}, - {"binary_encoding", http::binaryEncoding2string(binaryEncodingMode)}}); - ::setProtocolFields(dataJson); - auto const data = dataJson.dump(); - map> workerRequests; - for (auto const& workerId : _getWorkerIds()) { - auto const request = _asyncRequestWorker(workerId, data); - request->start(); - workerRequests[workerId] = request; - } - for (auto const& [workerId, request] : workerRequests) { - request->wait(); + // Make changes to the persistent state of Qserv and the Replicaton/Ingest system. + // Post warnings if any reported by the method. + list> const warnings = ingestData( + databaseName, tableName, schema, indexes, [&](uint32_t transactionId) -> map { + // Send table data to all eligible workers and wait for the responses. + // Note that requests are sent in parallel, and the duration of each such request + // is limited by the timeout parameter. + json dataJson = + json::object({{"transaction_id", transactionId}, + {"table", tableName}, + {"chunk", 0}, + {"overlap", 0}, + {"rows", rows}, + {"binary_encoding", http::binaryEncoding2string(binaryEncodingMode)}}); + setProtocolFields(dataJson); + auto const data = dataJson.dump(); + map> workerRequests; + for (auto const& workerId : getWorkerIds()) { + auto const request = asyncRequestWorker(workerId, data); + request->start(); + workerRequests[workerId] = request; + } + for (auto const& [workerId, request] : workerRequests) { + request->wait(); + } + + // Process responses from workers. + map errors; + for (auto const& [workerId, request] : workerRequests) { + if (request->responseCode() != qhttp::STATUS_OK) { + errors[workerId] = "http_code: " + to_string(request->responseCode()); + continue; + } + json response; + try { + response = json::parse(request->responseBody()); + } catch (exception const& ex) { + errors[workerId] = "ex: " + string(ex.what()); + continue; + } + if (response.at("success").get() == 0) { + errors[workerId] = "error: " + response.at("error").get(); + } + } + return errors; + }); + for (auto const& warning : warnings) { + warn(warning.first, warning.second); } - - // Process responses from workers. - map workerErrors; - for (auto const& [workerId, request] : workerRequests) { - if (request->responseCode() != qhttp::STATUS_OK) { - workerErrors[workerId] = "http_code: " + to_string(request->responseCode()); - continue; - } - json response; - try { - response = json::parse(request->responseBody()); - } catch (exception const& ex) { - workerErrors[workerId] = "ex: " + string(ex.what()); - continue; - } - if (response.at("success").get() == 0) { - workerErrors[workerId] = "error: " + response.at("error").get(); - } - } - if (!workerErrors.empty()) { - _abortTransaction(transactionId); - _deleteTable(databaseName, tableName); - json const errorExt = json::object({{"worker_errors", workerErrors}}); - throw http::Error(__func__, "error(s) reported by workers", errorExt); - } - - // Success: commit the transaction and publish the database. - _commitTransaction(transactionId); - _publishDatabase(databaseName); - - // The post-ingest steps are optional. They are allowed to fail without affecting the success - // of the ingest. A warning will be reported in the response in case of a failure. - _createIndexes(__func__, databaseName, tableName, indexes); - _countRows(__func__, databaseName, tableName); - return json(); } @@ -264,13 +183,13 @@ json HttpCzarIngestModule::_deleteDatabase() { checkApiVersion(__func__, 34); auto const databaseName = params().at("database"); - _timeoutSec = max(1U, body().optional("timeout", _timeoutSec)); + setTimeoutSec(max(1U, body().optional("timeout", timeoutSec()))); debug(__func__, "database: '" + databaseName + "'"); - debug(__func__, "timeout: " + to_string(_timeoutSec)); + debug(__func__, "timeout: " + to_string(timeoutSec())); - ::verifyUserDatabaseName(__func__, databaseName); - _deleteDatabase(databaseName); + verifyUserDatabaseName(__func__, databaseName); + deleteDatabase(databaseName); return json(); } @@ -280,224 +199,16 @@ json HttpCzarIngestModule::_deleteTable() { auto const databaseName = params().at("database"); auto const tableName = params().at("table"); - _timeoutSec = max(1U, body().optional("timeout", _timeoutSec)); + setTimeoutSec(max(1U, body().optional("timeout", timeoutSec()))); debug(__func__, "database: '" + databaseName + "'"); debug(__func__, "table: '" + tableName + "'"); - debug(__func__, "timeout: " + to_string(_timeoutSec)); + debug(__func__, "timeout: " + to_string(timeoutSec())); - ::verifyUserDatabaseName(__func__, databaseName); - ::verifyUserTableName(__func__, tableName); - _deleteTable(databaseName, tableName); + verifyUserDatabaseName(__func__, databaseName); + verifyUserTableName(__func__, tableName); + deleteTable(databaseName, tableName); return json(); } -vector HttpCzarIngestModule::_getWorkerIds() { - vector workerIds; - auto const workersJson = _requestController(http::Method::GET, "/replication/config"); - for (auto const& worker : workersJson.at("config").at("workers")) { - bool const isEnabled = worker.at("is-enabled").get() != 0; - bool const isReadOnly = worker.at("is-read-only").get() != 0; - if (isEnabled && !isReadOnly) { - workerIds.push_back(worker.at("name").get()); - } - } - if (workerIds.empty()) { - throw http::Error(__func__, "no workers found in this Qserv instance"); - } - return workerIds; -} - -void HttpCzarIngestModule::_unpublishOrCreateDatabase(const string& databaseName) { - json const config = _requestController(http::Method::GET, "/replication/config").at("config"); - for (const auto& database : config.at("databases")) { - if (boost::iequals(database.at("database").get(), databaseName)) { - if (database.at("is_published").get() != 0) _unpublishDatabase(databaseName); - if (::countDirectors(database) == 0) _createDirectorTable(databaseName); - return; - } - } - _createDatabase(databaseName); - _createDirectorTable(databaseName); -} - -void HttpCzarIngestModule::_createDatabase(string const& databaseName) { - json data = json::object({{"database", databaseName}, - {"num_stripes", ::defaultNumStripes}, - {"num_sub_stripes", ::defaultNumSubStripes}, - {"overlap", ::defaultOverlap}}); - _requestController(http::Method::POST, "/ingest/database", data); -} - -void HttpCzarIngestModule::_deleteDatabase(string const& databaseName) { - json data = json::object(); - _requestController(http::Method::DELETE, "/ingest/database/" + databaseName, data); -} - -void HttpCzarIngestModule::_unpublishDatabase(string const& databaseName) { - json data = json::object({{"publish", 0}}); - _requestController(http::Method::PUT, "/replication/config/database/" + databaseName, data); -} - -void HttpCzarIngestModule::_publishDatabase(string const& databaseName) { - json data = json::object(); - _requestController(http::Method::PUT, "/ingest/database/" + databaseName, data); -} - -void HttpCzarIngestModule::_createTable(string const& databaseName, string const& tableName, - json const& schema) { - json data = json::object( - {{"database", databaseName}, {"table", tableName}, {"is_partitioned", 0}, {"schema", schema}}); - _requestController(http::Method::POST, "/ingest/table/", data); -} - -void HttpCzarIngestModule::_createDirectorTable(string const& databaseName) { - json const schema = json::array({{{"name", "objectId"}, {"type", "BIGINT"}}, - {{"name", "ra"}, {"type", "DOUBLE"}}, - {{"name", "dec"}, {"type", "DOUBLE"}}, - {{"name", "chunkId"}, {"type", "INT UNSIGNED NOT NULL"}}, - {{"name", "subChunkId"}, {"type", "INT UNSIGNED NOT NULL"}}}); - json data = json::object( - {{"description", "The mandatory director table of the catalog. The table may be empty."}, - {"fields_terminated_by", ","}, - {"database", databaseName}, - {"table", ::defaultDirectorTableName}, - {"is_partitioned", 1}, - {"is_director", 1}, - {"director_key", "objectId"}, - {"longitude_key", "ra"}, - {"latitude_key", "dec"}, - {"chunk_id_key", "chunkId"}, - {"sub_chunk_id_key", "subChunkId"}, - {"schema", schema}}); - _requestController(http::Method::POST, "/ingest/table/", data); - _allocateChunk(databaseName, ::defaultChunkId); -} - -void HttpCzarIngestModule::_deleteTable(string const& databaseName, string const& tableName) { - json data = json::object(); - _requestController(http::Method::DELETE, "/ingest/table/" + databaseName + "/" + tableName, data); -} - -uint32_t HttpCzarIngestModule::_startTransaction(string const& databaseName) { - json data = json::object({{"database", databaseName}}); - auto const response = _requestController(http::Method::POST, "/ingest/trans", data); - return response.at("databases").at(databaseName).at("transactions")[0].at("id").get(); -} - -void HttpCzarIngestModule::_abortOrCommitTransaction(uint32_t id, bool abort) { - json data = json::object(); - auto const service = "/ingest/trans/" + to_string(id) + "?abort=" + (abort ? "1" : "0"); - _requestController(http::Method::PUT, service, data); -} - -json HttpCzarIngestModule::_allocateChunk(string const& databaseName, unsigned int chunkId) { - json data = json::object({{"database", databaseName}, {"chunk", chunkId}}); - return _requestController(http::Method::POST, "/ingest/chunk", data); -} - -void HttpCzarIngestModule::_createIndexes(string const& func, string const& databaseName, - string const& tableName, json const& indexes) { - for (auto const& indexDef : indexes) { - if (!indexDef.is_object()) throw http::Error(func, "index definition is not a JSON object"); - try { - json data = indexDef; - data["database"] = databaseName; - data["table"] = tableName; - data["overlap"] = 0; - _requestController(http::Method::POST, "/replication/sql/index", data); - } catch (exception const& ex) { - warn(func, "index creation failed: " + string(ex.what())); - } - } -} - -void HttpCzarIngestModule::_countRows(string const& func, string const& databaseName, - string const& tableName) { - json data = json::object({{"database", databaseName}, - {"table", tableName}, - {"row_counters_state_update_policy", "ENABLED"}, - {"row_counters_deploy_at_qserv", 1}}); - try { - _requestController(http::Method::POST, "/ingest/table-stats", data); - } catch (exception const& ex) { - warn(func, "row count failed: " + string(ex.what())); - } -} - -string HttpCzarIngestModule::_controller() { - if (_controllerBaseUrl.empty()) { - auto const response = _requestRegistry(http::Method::GET, "/services"); - for (auto const& [id, controller] : response.at("services").at("controllers").items()) { - if (id == "master") { - _controllerBaseUrl = "http://" + controller.at("host-addr").get() + ":" + - to_string(controller.at("port").get()); - return _controllerBaseUrl; - } - } - throw http::Error(__func__, "no master controller found in the response"); - } - return _controllerBaseUrl; -} - -string HttpCzarIngestModule::_worker(string const& workerId) { - if (_workerBaseUrls.empty()) { - auto const response = _requestRegistry(http::Method::GET, "/services"); - for (auto const& [id, worker] : response.at("services").at("workers").items()) { - auto const replicationWorker = worker.at("replication"); - _workerBaseUrls[id] = "http://" + replicationWorker.at("host-addr").get() + ":" + - to_string(replicationWorker.at("http-loader-port").get()); - } - } - if (_workerBaseUrls.count(workerId) == 0) { - throw http::Error(__func__, "no connection parameters for worker: " + workerId); - } - return _workerBaseUrls.at(workerId); -} - -json HttpCzarIngestModule::_request(http::Method method, string const& url, json& data) { - json const errorExt = json::object( - {{"method", http::method2string(method)}, {"url", url}, {"timeout_sec", _timeoutSec}}); - auto const request = _asyncRequest(method, url, data); - request->start(); - request->wait(); - if (request->responseCode() != qhttp::STATUS_OK) { - throw http::Error(__func__, "http_code: " + to_string(request->responseCode()), errorExt); - } - json response; - try { - response = json::parse(request->responseBody()); - debug(__func__, "response: " + response.dump()); - } catch (exception const& ex) { - throw http::Error(__func__, "ex: " + string(ex.what()), errorExt); - } - if (response.at("success").get() == 0) { - throw http::Error(__func__, "error: " + response.at("error").get(), errorExt); - } - return response; -} - -shared_ptr HttpCzarIngestModule::_asyncRequest(http::Method method, string const& url, - json& data) { - shared_ptr request; - if (method == http::Method::GET) { - string const url_ = url + "?version=" + to_string(http::MetaModule::version) + - "&instance_id=" + cconfig::CzarConfig::instance()->replicationInstanceId(); - request = http::AsyncReq::create(_io_service, nullptr, method, url_); - } else { - ::setProtocolFields(data); - unordered_map const headers({{"Content-Type", "application/json"}}); - request = http::AsyncReq::create(_io_service, nullptr, method, url, data.dump(), headers); - } - request->setExpirationIval(_timeoutSec); - return request; -} - -shared_ptr HttpCzarIngestModule::_asyncPostRequest(string const& url, string const& data) { - unordered_map const headers({{"Content-Type", "application/json"}}); - auto const request = http::AsyncReq::create(_io_service, nullptr, http::Method::POST, url, data, headers); - request->setExpirationIval(_timeoutSec); - return request; -} - } // namespace lsst::qserv::czar diff --git a/src/czar/HttpCzarIngestModule.h b/src/czar/HttpCzarIngestModule.h index 25b53bdf3..3b370cf3f 100644 --- a/src/czar/HttpCzarIngestModule.h +++ b/src/czar/HttpCzarIngestModule.h @@ -22,25 +22,19 @@ #define LSST_QSERV_CZAR_HTTPCZARINGESTMODULE_H // System headers -#include #include #include -#include // Third party headers #include "boost/asio.hpp" #include "nlohmann/json.hpp" // Qserv headers +#include "czar/HttpCzarIngestModuleBase.h" #include "http/ChttpModule.h" -#include "http/Method.h" // Forward declarations -namespace lsst::qserv::http { -class AsyncReq; -} // namespace lsst::qserv::http - namespace httplib { class Request; class Response; @@ -53,7 +47,7 @@ namespace lsst::qserv::czar { * Class HttpCzarIngestModule implements a handler for processing requests for ingesting * user-generated data prodicts via the HTTP-based frontend. */ -class HttpCzarIngestModule : public http::ChttpModule { +class HttpCzarIngestModule : public http::ChttpModule, public HttpCzarIngestModuleBase { public: /** * @note supported values for parameter 'subModuleName' are: @@ -86,148 +80,8 @@ class HttpCzarIngestModule : public http::ChttpModule { nlohmann::json _deleteDatabase(); nlohmann::json _deleteTable(); - // The following methods are used to interact with the Replication Controller. - // The methods throw http::Error or other exceptions in case of communication - // errors or errors reported by the server. - - void _unpublishOrCreateDatabase(const std::string& databaseName); - void _createDatabase(std::string const& databaseName); - void _deleteDatabase(std::string const& databaseName); - void _unpublishDatabase(std::string const& databaseName); - void _publishDatabase(std::string const& databaseName); - - void _createTable(std::string const& databaseName, std::string const& tableName, - nlohmann::json const& schema); - void _createDirectorTable(std::string const& databaseName); - void _deleteTable(std::string const& databaseName, std::string const& tableName); - - std::uint32_t _startTransaction(std::string const& databaseName); - void _abortTransaction(std::uint32_t id) { _abortOrCommitTransaction(id, true); } - void _commitTransaction(std::uint32_t id) { _abortOrCommitTransaction(id, false); } - void _abortOrCommitTransaction(std::uint32_t id, bool abort); - - nlohmann::json _allocateChunk(std::string const& databaseName, unsigned int chunkId); - - void _createIndexes(std::string const& func, std::string const& databaseName, - std::string const& tableName, nlohmann::json const& indexes); - void _countRows(std::string const& func, std::string const& databaseName, std::string const& tableName); - - std::vector _getWorkerIds(); - - /** - * Pull connection parameters of the Master Replication Controller from Registry - * and build the base path of the Controller's service. The result will be cached. - * The method will return the cached value if the one is already available. - * @return The base URL for the Controller. - */ - std::string _controller(); - - /** - * Pull connection parameters of the specified worker from the Master Replication Controller - * and build the base path of the service. The result will be cached. - * The method will return the cached value if the one is already available. - * @param workerId The worker's identifier. - * @return The base URL for the worker. - */ - std::string _worker(std::string const& workerId); - - /** - * Send a request to the Master Replication Controller. - * @param method HTTP method for the request. - * @param service The REST service to be called (not a complete URL). - * @param data Data to be sent with the request. - * @return nlohmann::json A result (JSON object) reported by the server. - */ - nlohmann::json _requestController(http::Method method, std::string const& service, nlohmann::json& data) { - return _request(method, _controller() + service, data); - } - - nlohmann::json _requestController(http::Method method, std::string const& service) { - nlohmann::json data; - return _requestController(method, service, data); - } - - /** - * Send a request to the Registry. - * @param method HTTP method for the request. - * @param service The REST service to be called (not a complete URL). - * @return nlohmann::json A result (JSON object) reported by the server. - */ - nlohmann::json _requestRegistry(http::Method method, std::string const& service) { - return _request(method, _registryBaseUrl + service); - } - - /** - * Send a request to a server, wait for its completion and extract a result). - * @note The data object may be extended by the method to include additional - * attrubutes required for the request, including the version number of - * the REST API and the authorization keys. - * @param method HTTP method for the request. - * @param url A complete URL for the REST service to be called. - * @param data Data to be sent with the request. - * @return nlohmann::json A result (JSON object) reported by the server. - * @throw http::Error for specific errors reported by the client library. - * @throw std::runtime_error In case if a error was received from the server. - */ - nlohmann::json _request(http::Method method, std::string const& url, nlohmann::json& data); - - nlohmann::json _request(http::Method method, std::string const& url) { - nlohmann::json data; - return _request(method, url, data); - } - - /** - * Create an asynchronous POST request to the specified Replication Worker. - * @note The request won't be started. It's up to a caller to do so. - * @param workerId The worker's identifier. - * @param method HTTP method for the request. - * @param data Serialized JSON object to be sent with the request. - * @return std::shared_ptr A pointer to the request object. - */ - std::shared_ptr _asyncRequestWorker(std::string const& workerId, - std::string const& data) { - return _asyncPostRequest(_worker(workerId) + "/ingest/data", data); - } - - /** - * Create an asynchronous request to the server. - * @note The data object may be extended by the method to include additional - * attrubutes required for the request, including the version number of - * the REST API and the authorization keys. - * @note The request won't be started. It's up to a caller to do so. - * @param method HTTP method for the request. - * @param url A complete URL for the REST service to be called. - * @param data Data to be sent with the request. - * @return std::shared_ptr A pointer to the request object. - * @throw http::Error for specific errors reported by the client library. - */ - std::shared_ptr _asyncRequest(http::Method method, std::string const& url, - nlohmann::json& data); - - /** - * Create an asynchronous POST request to the server. - * @note The request won't be started. It's up to a caller to do so. - * @param url A complete URL for the REST service to be called. - * @param data Serialized JSON object to be sent with the request. - * @return std::shared_ptr A pointer to the request object. - * @throw http::Error for specific errors reported by the client library. - */ - std::shared_ptr _asyncPostRequest(std::string const& url, std::string const& data); - - /// I/O service for async TCP communications. - boost::asio::io_service& _io_service; - /// The context string for posting messages into the logging stream. std::string const _context; - - /// Base URL for communications with the Registry server. - std::string const _registryBaseUrl; - - // Parameters set upon the request processing. - - unsigned int _timeoutSec = 300; ///< The default timeout for requests. - std::string _controllerBaseUrl; ///< The cached URL for the Controller's REST service. - std::map _workerBaseUrls; ///< The cached URLs for workers' REST services. }; } // namespace lsst::qserv::czar diff --git a/src/czar/HttpCzarIngestModuleBase.cc b/src/czar/HttpCzarIngestModuleBase.cc new file mode 100644 index 000000000..d9fc6d917 --- /dev/null +++ b/src/czar/HttpCzarIngestModuleBase.cc @@ -0,0 +1,350 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + +// Class header +#include "czar/HttpCzarIngestModuleBase.h" + +// System headers +#include +#include +#include +#include + +// Third party headers +#include "boost/algorithm/string.hpp" + +// Qserv headers +#include "cconfig/CzarConfig.h" +#include "http/AsyncReq.h" +#include "http/BinaryEncoding.h" +#include "http/Exceptions.h" +#include "http/MetaModule.h" +#include "http/RequestBodyJSON.h" +#include "qhttp/Status.h" + +using namespace std; +namespace asio = boost::asio; +namespace cconfig = lsst::qserv::cconfig; +namespace http = lsst::qserv::http; +using json = nlohmann::json; + +namespace { + +size_t countDirectors(const json& database) { + size_t numDirectors = 0; + for (const auto& table : database.at("tables")) { + if (table.at("is_director").get() != 0) { + ++numDirectors; + } + } + return numDirectors; +} + +// These parameters correspond to the present partitioning model of 150k chunks. +// In reality the parameters are not needed for ingesting regular tables which is +// is the only kind of tables supported by the current implementation of the module. +// Some value of the parameters are still required by the Replication/Ingest system's API. +unsigned int defaultNumStripes = 340; +unsigned int defaultNumSubStripes = 3; +float defaultOverlap = 0.01667; + +string const defaultDirectorTableName = "qserv_director"; +unsigned int const defaultChunkId = 0; + +} // namespace + +namespace lsst::qserv::czar { + +HttpCzarIngestModuleBase::HttpCzarIngestModuleBase(asio::io_service& io_service) + : _io_service(io_service), + _registryBaseUrl("http://" + cconfig::CzarConfig::instance()->replicationRegistryHost() + ":" + + to_string(cconfig::CzarConfig::instance()->replicationRegistryPort())) {} + +list> HttpCzarIngestModuleBase::ingestData( + string const& databaseName, string const& tableName, json const& schema, json const& indexes, + function(uint32_t)> const& submitRequestsToWorkers) { + _unpublishOrCreateDatabase(databaseName); + _createTable(databaseName, tableName, schema); + + uint32_t transactionId = 0; + try { + transactionId = _startTransaction(databaseName); + } catch (exception const& ex) { + deleteTable(databaseName, tableName); + throw; + } + + map const workerErrors = submitRequestsToWorkers(transactionId); + + if (!workerErrors.empty()) { + _abortTransaction(transactionId); + deleteTable(databaseName, tableName); + json const errorExt = json::object({{"worker_errors", workerErrors}}); + throw http::Error(__func__, "error(s) reported by workers", errorExt); + } + + // Success: commit the transaction and publish the database. + _commitTransaction(transactionId); + _publishDatabase(databaseName); + + // The post-ingest steps are optional. They are allowed to fail without affecting the success + // of the ingest. A warning will be reported in the response in case of a failure. + list> warnings; + _createIndexes(__func__, databaseName, tableName, indexes, warnings); + _countRows(__func__, databaseName, tableName, warnings); + return warnings; +} + +void HttpCzarIngestModuleBase::verifyUserDatabaseName(string const& func, string const& databaseName) { + string const userDatabaseNamesPrefix = "user_"; + if ((databaseName.size() <= userDatabaseNamesPrefix.size()) || + !boost::iequals(databaseName.substr(0, userDatabaseNamesPrefix.size()), userDatabaseNamesPrefix)) { + auto err = "database name doesn't start with the prefix: " + userDatabaseNamesPrefix; + throw http::Error(func, err); + } +} + +void HttpCzarIngestModuleBase::verifyUserTableName(string const& func, string const& tableName) { + string const qservTableNamesPrefix = "qserv_"; + if (tableName.empty()) throw http::Error(func, "table name is empty"); + if (boost::iequals(tableName.substr(0, qservTableNamesPrefix.size()), qservTableNamesPrefix)) { + auto err = "table name starts with the reserved prefix: " + qservTableNamesPrefix; + throw http::Error(func, err); + } +} + +void HttpCzarIngestModuleBase::deleteDatabase(string const& databaseName) { + json data = json::object(); + _requestController(http::Method::DELETE, "/ingest/database/" + databaseName, data); +} + +void HttpCzarIngestModuleBase::deleteTable(string const& databaseName, string const& tableName) { + json data = json::object(); + _requestController(http::Method::DELETE, "/ingest/table/" + databaseName + "/" + tableName, data); +} + +vector HttpCzarIngestModuleBase::getWorkerIds() { + vector workerIds; + auto const workersJson = _requestController(http::Method::GET, "/replication/config"); + for (auto const& worker : workersJson.at("config").at("workers")) { + bool const isEnabled = worker.at("is-enabled").get() != 0; + bool const isReadOnly = worker.at("is-read-only").get() != 0; + if (isEnabled && !isReadOnly) { + workerIds.push_back(worker.at("name").get()); + } + } + if (workerIds.empty()) { + throw http::Error(__func__, "no workers found in this Qserv instance"); + } + return workerIds; +} + +void HttpCzarIngestModuleBase::_unpublishOrCreateDatabase(const string& databaseName) { + json const config = _requestController(http::Method::GET, "/replication/config").at("config"); + for (const auto& database : config.at("databases")) { + if (boost::iequals(database.at("database").get(), databaseName)) { + if (database.at("is_published").get() != 0) _unpublishDatabase(databaseName); + if (::countDirectors(database) == 0) _createDirectorTable(databaseName); + return; + } + } + _createDatabase(databaseName); + _createDirectorTable(databaseName); +} + +void HttpCzarIngestModuleBase::_createDatabase(string const& databaseName) { + json data = json::object({{"database", databaseName}, + {"num_stripes", ::defaultNumStripes}, + {"num_sub_stripes", ::defaultNumSubStripes}, + {"overlap", ::defaultOverlap}}); + _requestController(http::Method::POST, "/ingest/database", data); +} + +void HttpCzarIngestModuleBase::_unpublishDatabase(string const& databaseName) { + json data = json::object({{"publish", 0}}); + _requestController(http::Method::PUT, "/replication/config/database/" + databaseName, data); +} + +void HttpCzarIngestModuleBase::_publishDatabase(string const& databaseName) { + json data = json::object(); + _requestController(http::Method::PUT, "/ingest/database/" + databaseName, data); +} + +void HttpCzarIngestModuleBase::_createTable(string const& databaseName, string const& tableName, + json const& schema) { + json data = json::object( + {{"database", databaseName}, {"table", tableName}, {"is_partitioned", 0}, {"schema", schema}}); + _requestController(http::Method::POST, "/ingest/table/", data); +} + +void HttpCzarIngestModuleBase::_createDirectorTable(string const& databaseName) { + json const schema = json::array({{{"name", "objectId"}, {"type", "BIGINT"}}, + {{"name", "ra"}, {"type", "DOUBLE"}}, + {{"name", "dec"}, {"type", "DOUBLE"}}, + {{"name", "chunkId"}, {"type", "INT UNSIGNED NOT NULL"}}, + {{"name", "subChunkId"}, {"type", "INT UNSIGNED NOT NULL"}}}); + json data = json::object( + {{"description", "The mandatory director table of the catalog. The table may be empty."}, + {"fields_terminated_by", ","}, + {"database", databaseName}, + {"table", ::defaultDirectorTableName}, + {"is_partitioned", 1}, + {"is_director", 1}, + {"director_key", "objectId"}, + {"longitude_key", "ra"}, + {"latitude_key", "dec"}, + {"chunk_id_key", "chunkId"}, + {"sub_chunk_id_key", "subChunkId"}, + {"schema", schema}}); + _requestController(http::Method::POST, "/ingest/table/", data); + _allocateChunk(databaseName, ::defaultChunkId); +} + +uint32_t HttpCzarIngestModuleBase::_startTransaction(string const& databaseName) { + json data = json::object({{"database", databaseName}}); + auto const response = _requestController(http::Method::POST, "/ingest/trans", data); + return response.at("databases").at(databaseName).at("transactions")[0].at("id").get(); +} + +void HttpCzarIngestModuleBase::_abortOrCommitTransaction(uint32_t id, bool abort) { + json data = json::object(); + auto const service = "/ingest/trans/" + to_string(id) + "?abort=" + (abort ? "1" : "0"); + _requestController(http::Method::PUT, service, data); +} + +json HttpCzarIngestModuleBase::_allocateChunk(string const& databaseName, unsigned int chunkId) { + json data = json::object({{"database", databaseName}, {"chunk", chunkId}}); + return _requestController(http::Method::POST, "/ingest/chunk", data); +} + +void HttpCzarIngestModuleBase::_createIndexes(string const& func, string const& databaseName, + string const& tableName, json const& indexes, + list>& warnings) { + for (auto const& indexDef : indexes) { + if (!indexDef.is_object()) throw http::Error(func, "index definition is not a JSON object"); + try { + json data = indexDef; + data["database"] = databaseName; + data["table"] = tableName; + data["overlap"] = 0; + _requestController(http::Method::POST, "/replication/sql/index", data); + } catch (exception const& ex) { + warnings.emplace_back(func, "index creation failed: " + string(ex.what())); + } + } +} + +void HttpCzarIngestModuleBase::_countRows(string const& func, string const& databaseName, + string const& tableName, list>& warnings) { + json data = json::object({{"database", databaseName}, + {"table", tableName}, + {"row_counters_state_update_policy", "ENABLED"}, + {"row_counters_deploy_at_qserv", 1}}); + try { + _requestController(http::Method::POST, "/ingest/table-stats", data); + } catch (exception const& ex) { + warnings.emplace_back(func, "row count failed: " + string(ex.what())); + } +} + +string HttpCzarIngestModuleBase::_controller() { + if (_controllerBaseUrl.empty()) { + auto const response = _requestRegistry(http::Method::GET, "/services"); + for (auto const& [id, controller] : response.at("services").at("controllers").items()) { + if (id == "master") { + _controllerBaseUrl = "http://" + controller.at("host-addr").get() + ":" + + to_string(controller.at("port").get()); + return _controllerBaseUrl; + } + } + throw http::Error(__func__, "no master controller found in the response"); + } + return _controllerBaseUrl; +} + +string HttpCzarIngestModuleBase::_worker(string const& workerId) { + if (_workerBaseUrls.empty()) { + auto const response = _requestRegistry(http::Method::GET, "/services"); + for (auto const& [id, worker] : response.at("services").at("workers").items()) { + auto const replicationWorker = worker.at("replication"); + _workerBaseUrls[id] = "http://" + replicationWorker.at("host-addr").get() + ":" + + to_string(replicationWorker.at("http-loader-port").get()); + } + } + if (_workerBaseUrls.count(workerId) == 0) { + throw http::Error(__func__, "no connection parameters for worker: " + workerId); + } + return _workerBaseUrls.at(workerId); +} + +json HttpCzarIngestModuleBase::_request(http::Method method, string const& url, json& data) { + json const errorExt = json::object( + {{"method", http::method2string(method)}, {"url", url}, {"timeout_sec", _timeoutSec}}); + auto const request = _asyncRequest(method, url, data); + request->start(); + request->wait(); + if (request->responseCode() != qhttp::STATUS_OK) { + throw http::Error(__func__, "http_code: " + to_string(request->responseCode()), errorExt); + } + json response; + try { + response = json::parse(request->responseBody()); + } catch (exception const& ex) { + throw http::Error(__func__, "ex: " + string(ex.what()), errorExt); + } + if (response.at("success").get() == 0) { + throw http::Error(__func__, "error: " + response.at("error").get(), errorExt); + } + return response; +} + +shared_ptr HttpCzarIngestModuleBase::_asyncRequest(http::Method method, string const& url, + json& data) { + shared_ptr request; + if (method == http::Method::GET) { + string const url_ = url + "?version=" + to_string(http::MetaModule::version) + + "&instance_id=" + cconfig::CzarConfig::instance()->replicationInstanceId(); + request = http::AsyncReq::create(_io_service, nullptr, method, url_); + } else { + setProtocolFields(data); + unordered_map const headers({{"Content-Type", "application/json"}}); + request = http::AsyncReq::create(_io_service, nullptr, method, url, data.dump(), headers); + } + request->setExpirationIval(_timeoutSec); + return request; +} + +shared_ptr HttpCzarIngestModuleBase::_asyncPostRequest(string const& url, + string const& data) { + unordered_map const headers({{"Content-Type", "application/json"}}); + auto const request = http::AsyncReq::create(_io_service, nullptr, http::Method::POST, url, data, headers); + request->setExpirationIval(_timeoutSec); + return request; +} + +void HttpCzarIngestModuleBase::setProtocolFields(json& data) const { + data["version"] = http::MetaModule::version; + data["instance_id"] = cconfig::CzarConfig::instance()->replicationInstanceId(); + data["auth_key"] = cconfig::CzarConfig::instance()->replicationAuthKey(); + data["admin_auth_key"] = cconfig::CzarConfig::instance()->replicationAdminAuthKey(); +} + +} // namespace lsst::qserv::czar diff --git a/src/czar/HttpCzarIngestModuleBase.h b/src/czar/HttpCzarIngestModuleBase.h new file mode 100644 index 000000000..863a0b03f --- /dev/null +++ b/src/czar/HttpCzarIngestModuleBase.h @@ -0,0 +1,275 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ +#ifndef LSST_QSERV_CZAR_HTTPCZARINGESTMODULEBASE_H +#define LSST_QSERV_CZAR_HTTPCZARINGESTMODULEBASE_H + +// System headers +#include +#include +#include +#include +#include +#include + +// Third party headers +#include "boost/asio.hpp" +#include "nlohmann/json.hpp" + +// Qserv headers +#include "http/Method.h" + +// Forward declarations + +namespace lsst::qserv::http { +class AsyncReq; +} // namespace lsst::qserv::http + +// This header declarations +namespace lsst::qserv::czar { + +/** + * Class HttpCzarIngestModuleBase is a base class for a family of the Czar ingest modules. + * A purpose of the class is to provide subclasses with common services and data, and avoid code + * duplication should each subclass had its own implementation of the services. + */ +class HttpCzarIngestModuleBase { +public: + HttpCzarIngestModuleBase() = delete; + HttpCzarIngestModuleBase(HttpCzarIngestModuleBase const&) = delete; + HttpCzarIngestModuleBase& operator=(HttpCzarIngestModuleBase const&) = delete; + + virtual ~HttpCzarIngestModuleBase() = default; + +protected: + HttpCzarIngestModuleBase(boost::asio::io_service& io_service); + + // HTTP timeout management methods. + + void setTimeoutSec(unsigned int timeoutSec) { _timeoutSec = timeoutSec; } + unsigned int timeoutSec() const { return _timeoutSec; } + + /** + * Ingest the table into the Qserv. + * @param databaseName The name of the database to ingest the data into. + * @param tableName The name of the table to ingest the data into. + * @param schema The schema of the table. + * @param indexes The indexes to be created for the table. + * @param submitRequestsToWorkers A function to submit requests to the workers. The function is + * expected to return a map of worker identifiers and error messages reported by the workers. + * The only input parameter of the function is the transaction identifier. + * @return A collection of warnings reported by the ingest process, where each entry is represented by + * a pair of a scope (function) and a message of the warning. + * @throw http::Error In case of a communication error or an error reported by the server. + */ + std::list> ingestData( + std::string const& databaseName, std::string const& tableName, nlohmann::json const& schema, + nlohmann::json const& indexes, + std::function(uint32_t)> const& submitRequestsToWorkers); + + /** + * Verify the user-provided database name to ensure the name starts with the reserved + * prefix "user_". + * @param func The name of the calling function. + * @param databaseName The name of the database to be verified. + * @throw http::Error if the name is too short, or it doesn't start with the required prefix. + */ + static void verifyUserDatabaseName(std::string const& func, std::string const& databaseName); + + /** + * Verify the user-provided table name to ensure the name doesn't start with the reserved + * prefix "qserv_". + * @param func The name of the calling function. + * @param tableName The name of the table to be verified. + * @throw http::Error if the name is too short, or it starts with the reserved prefix. + */ + static void verifyUserTableName(std::string const& func, std::string const& tableName); + + /** + * Delete the specified database in Qserv. + * @param databaseName The name of the database to be deleted. + * @throw http::Error In case of a communication error or an error reported by the server. + */ + void deleteDatabase(std::string const& databaseName); + + /** + * Delete the specified table in Qserv. + * @param databaseName The name of the database were the table is residing. + * @param tableName The name of the table to be deleted. + * @throw http::Error In case of a communication error or an error reported by the server. + */ + void deleteTable(std::string const& databaseName, std::string const& tableName); + + /** + * Get the list of worker identifiers. + * @return A vector of worker identifiers. + * @throw http::Error In case of a communication error or an error reported by the server. + */ + std::vector getWorkerIds(); + + /** + * Create an asynchronous POST request to the specified Replication Worker. + * @note The request won't be started. It's up to a caller to do so. + * @param workerId The worker's identifier. + * @param method HTTP method for the request. + * @param data Serialized JSON object to be sent with the request. + * @return std::shared_ptr A pointer to the request object. + */ + std::shared_ptr asyncRequestWorker(std::string const& workerId, std::string const& data) { + return _asyncPostRequest(_worker(workerId) + "/ingest/data", data); + } + + /** + * Set the protocol fields in the JSON object. + * @param data The JSON object to be updated. + */ + void setProtocolFields(nlohmann::json& data) const; + +private: + // The following methods are used to interact with the Replication Controller. + // The methods throw http::Error or other exceptions in case of communication + // errors or errors reported by the server. + + void _unpublishOrCreateDatabase(const std::string& databaseName); + void _createDatabase(std::string const& databaseName); + void _unpublishDatabase(std::string const& databaseName); + void _publishDatabase(std::string const& databaseName); + + void _createTable(std::string const& databaseName, std::string const& tableName, + nlohmann::json const& schema); + void _createDirectorTable(std::string const& databaseName); + + std::uint32_t _startTransaction(std::string const& databaseName); + void _abortTransaction(std::uint32_t id) { _abortOrCommitTransaction(id, true); } + void _commitTransaction(std::uint32_t id) { _abortOrCommitTransaction(id, false); } + void _abortOrCommitTransaction(std::uint32_t id, bool abort); + + nlohmann::json _allocateChunk(std::string const& databaseName, unsigned int chunkId); + + void _createIndexes(std::string const& func, std::string const& databaseName, + std::string const& tableName, nlohmann::json const& indexes, + std::list>& warnings); + + void _countRows(std::string const& func, std::string const& databaseName, std::string const& tableName, + std::list>& warnings); + + /** + * Pull connection parameters of the Master Replication Controller from Registry + * and build the base path of the Controller's service. The result will be cached. + * The method will return the cached value if the one is already available. + * @return The base URL for the Controller. + */ + std::string _controller(); + + /** + * Pull connection parameters of the specified worker from the Master Replication Controller + * and build the base path of the service. The result will be cached. + * The method will return the cached value if the one is already available. + * @param workerId The worker's identifier. + * @return The base URL for the worker. + */ + std::string _worker(std::string const& workerId); + + /** + * Send a request to the Master Replication Controller. + * @param method HTTP method for the request. + * @param service The REST service to be called (not a complete URL). + * @param data Data to be sent with the request. + * @return nlohmann::json A result (JSON object) reported by the server. + */ + nlohmann::json _requestController(http::Method method, std::string const& service, nlohmann::json& data) { + return _request(method, _controller() + service, data); + } + + nlohmann::json _requestController(http::Method method, std::string const& service) { + nlohmann::json data; + return _requestController(method, service, data); + } + + /** + * Send a request to the Registry. + * @param method HTTP method for the request. + * @param service The REST service to be called (not a complete URL). + * @return nlohmann::json A result (JSON object) reported by the server. + */ + nlohmann::json _requestRegistry(http::Method method, std::string const& service) { + return _request(method, _registryBaseUrl + service); + } + + /** + * Send a request to a server, wait for its completion and extract a result). + * @note The data object may be extended by the method to include additional + * attrubutes required for the request, including the version number of + * the REST API and the authorization keys. + * @param method HTTP method for the request. + * @param url A complete URL for the REST service to be called. + * @param data Data to be sent with the request. + * @return nlohmann::json A result (JSON object) reported by the server. + * @throw http::Error for specific errors reported by the client library. + * @throw std::runtime_error In case if a error was received from the server. + */ + nlohmann::json _request(http::Method method, std::string const& url, nlohmann::json& data); + + nlohmann::json _request(http::Method method, std::string const& url) { + nlohmann::json data; + return _request(method, url, data); + } + + /** + * Create an asynchronous request to the server. + * @note The data object may be extended by the method to include additional + * attrubutes required for the request, including the version number of + * the REST API and the authorization keys. + * @note The request won't be started. It's up to a caller to do so. + * @param method HTTP method for the request. + * @param url A complete URL for the REST service to be called. + * @param data Data to be sent with the request. + * @return std::shared_ptr A pointer to the request object. + * @throw http::Error for specific errors reported by the client library. + */ + std::shared_ptr _asyncRequest(http::Method method, std::string const& url, + nlohmann::json& data); + + /** + * Create an asynchronous POST request to the server. + * @note The request won't be started. It's up to a caller to do so. + * @param url A complete URL for the REST service to be called. + * @param data Serialized JSON object to be sent with the request. + * @return std::shared_ptr A pointer to the request object. + * @throw http::Error for specific errors reported by the client library. + */ + std::shared_ptr _asyncPostRequest(std::string const& url, std::string const& data); + + /// I/O service for async TCP communications. + boost::asio::io_service& _io_service; + + /// Base URL for communications with the Registry server. + std::string const _registryBaseUrl; + + // Parameters set upon the request processing. + + unsigned int _timeoutSec = 300; ///< The default timeout for requests. + std::string _controllerBaseUrl; ///< The cached URL for the Controller's REST service. + std::map _workerBaseUrls; ///< The cached URLs for workers' REST services. +}; + +} // namespace lsst::qserv::czar + +#endif // LSST_QSERV_CZAR_HTTPCZARINGESTMODULEBASE_H From 1a859678590f4e097bfa0c651ca3f3cdfc573efb Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Fri, 1 Nov 2024 09:49:39 -0700 Subject: [PATCH 5/6] Add support for CSV and multipart/form-data in Czar HTTP frontend Refactored the command line application qserv-czar-http to use Boost options for command line parsing. Extended the integration test scripts and the configuration file accordingly. --- admin/local/docker/compose/docker-compose.yml | 8 +- .../python/lsst/qserv/admin/cli/entrypoint.py | 53 +++- src/czar/CMakeLists.txt | 3 + src/czar/HttpCzarIngestCsvModule.cc | 262 ++++++++++++++++++ src/czar/HttpCzarIngestCsvModule.h | 130 +++++++++ src/czar/HttpCzarIngestModuleBase.cc | 24 ++ src/czar/HttpCzarIngestModuleBase.h | 38 +++ src/czar/HttpCzarSvc.cc | 34 ++- src/czar/HttpCzarSvc.h | 32 ++- src/czar/WorkerIngestProcessor.cc | 50 ++++ src/czar/WorkerIngestProcessor.h | 115 ++++++++ src/czar/qserv-czar-http.cc | 123 +++++--- 12 files changed, 806 insertions(+), 66 deletions(-) create mode 100644 src/czar/HttpCzarIngestCsvModule.cc create mode 100644 src/czar/HttpCzarIngestCsvModule.h create mode 100644 src/czar/WorkerIngestProcessor.cc create mode 100644 src/czar/WorkerIngestProcessor.h diff --git a/admin/local/docker/compose/docker-compose.yml b/admin/local/docker/compose/docker-compose.yml index a8c13e90d..6fd2dfeba 100644 --- a/admin/local/docker/compose/docker-compose.yml +++ b/admin/local/docker/compose/docker-compose.yml @@ -397,10 +397,14 @@ services: entrypoint --log-level DEBUG czar-http --db-uri mysql://qsmaster@czar_mariadb:3306/ --xrootd-manager czar_xrootd - --http-frontend-port 4048 - --http-frontend-threads 4 + --czar-name http + --http-port 4048 + --http-threads 4 + --http-worker-ingest-threads 2 --http-ssl-cert-file /config-etc/ssl/czar-cert.pem --http-ssl-private-key-file /config-etc/ssl/czar-key.pem + --http-tmp-dir /tmp + --http-conn-pool-size 2 --log-cfg-file=/config-etc/log/log-czar-proxy.cnf --repl-instance-id qserv_proj --repl-auth-key replauthkey diff --git a/src/admin/python/lsst/qserv/admin/cli/entrypoint.py b/src/admin/python/lsst/qserv/admin/cli/entrypoint.py index 32c409a70..6b003bce2 100644 --- a/src/admin/python/lsst/qserv/admin/cli/entrypoint.py +++ b/src/admin/python/lsst/qserv/admin/cli/entrypoint.py @@ -139,12 +139,16 @@ class CommandInfo: )), ("czar-http", CommandInfo( "qserv-czar-http " - "http " - "{{czar_cfg_path}} " - "{{http_frontend_port}} " - "{{http_frontend_threads}} " - "{{http_ssl_cert_file}} " - "{{http_ssl_private_key_file}}", + "--czar-name {{czar_name}} " + "--config {{czar_cfg_path}} " + "--port {{http_port}} " + "--threads {{http_threads}} " + "--worker-ingest-threads {{http_worker_ingest_threads}} " + "--ssl-cert-file {{http_ssl_cert_file}} " + "--ssl-private-key-file {{http_ssl_private_key_file}} " + "--tmp-dir {{http_tmp_dir}} " + "--conn-pool-size {{http_conn_pool_size}} " + "--verbose", )), ("cmsd-manager", CommandInfo( "cmsd -c {{cmsd_manager_cfg_path}} -n manager -I v4", @@ -564,17 +568,24 @@ def proxy(ctx: click.Context, **kwargs: Any) -> None: @option_mysql_monitor_password() @option_xrootd_manager(required=True) @click.option( - "--http-frontend-port", + "--http-port", default="4048", show_default=True, - help="The HTTP port of the frontend. The value of http_frontend_port is passed as a command-line" + help="The HTTP port of the frontend. The value of the parameter is passed as a command-line" " parameter to the application." ) @click.option( - "--http-frontend-threads", + "--http-threads", default="2", show_default=True, - help="The number of threads for the HTTP server of the frontend. The value of http_frontend_threads is passed" + help="The number of the request processing threads in the REST service. The value of the parameter is passed" + " as a command-line parameter to the application." +) +@click.option( + "--http-worker-ingest-threads", + default="2", + show_default=True, + help="The number of the request processing threads in the REST service. The value of the parameter is passed" " as a command-line parameter to the application." ) @click.option( @@ -601,6 +612,28 @@ def proxy(ctx: click.Context, **kwargs: Any) -> None: default=czar_cfg_path, show_default=True, ) +@click.option( + "--http-tmp-dir", + help="The temporary directory for the HTTP server of the frontend.", + default="/tmp", + show_default=True, +) +@click.option( + "--http-conn-pool-size", + help="A size of a connection pool for synchronous communications over the HTTP" + " protocol with the Qserv Worker Ingest servbers. The default value is 0," + " which assumes that the pool size is determined by an implementation of" + " the underlying library 'libcurl'. The number of connectons in a production" + " Qserv deployment should be at least the number of workers in the deployment.", + default=0, + show_default=True, +) +@click.option( + "--czar-name", + help="The unique name of the Czar instance.", + default="http", + show_default=True, +) @option_log_cfg_file() @option_repl_instance_id(required=True) @option_repl_auth_key(required=True) diff --git a/src/czar/CMakeLists.txt b/src/czar/CMakeLists.txt index 719ed4931..cb9498c86 100644 --- a/src/czar/CMakeLists.txt +++ b/src/czar/CMakeLists.txt @@ -3,6 +3,7 @@ add_library(czar OBJECT) target_sources(czar PRIVATE ChttpModule.cc Czar.cc + HttpCzarIngestCsvModule.cc HttpCzarIngestModuleBase.cc HttpCzarIngestModule.cc HttpCzarQueryModule.cc @@ -11,6 +12,7 @@ target_sources(czar PRIVATE HttpSvc.cc MessageTable.cc QhttpModule.cc + WorkerIngestProcessor.cc ) target_include_directories(czar PRIVATE @@ -26,6 +28,7 @@ target_link_libraries(czar PUBLIC log XrdSsiLib cpp-httplib + boost_program_options ) function(CZAR_UTILS) diff --git a/src/czar/HttpCzarIngestCsvModule.cc b/src/czar/HttpCzarIngestCsvModule.cc new file mode 100644 index 000000000..b3a3dc169 --- /dev/null +++ b/src/czar/HttpCzarIngestCsvModule.cc @@ -0,0 +1,262 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + +// Class header +#include "czar/HttpCzarIngestCsvModule.h" + +// System headers +#include +#include + +// Third party headers +#include "boost/filesystem.hpp" + +// Qserv headers +#include "cconfig/CzarConfig.h" +#include "czar/WorkerIngestProcessor.h" +#include "http/AsyncReq.h" +#include "http/Client.h" +#include "http/Exceptions.h" +#include "http/RequestBodyJSON.h" +#include "qhttp/Status.h" + +using namespace std; +namespace asio = boost::asio; +namespace cconfig = lsst::qserv::cconfig; +namespace fs = boost::filesystem; +namespace http = lsst::qserv::http; +using json = nlohmann::json; + +namespace lsst::qserv::czar { + +void HttpCzarIngestCsvModule::process(asio::io_service& io_service, string const& context, + string const& tmpDir, httplib::Request const& req, + httplib::Response& resp, httplib::ContentReader const& contentReader, + shared_ptr const& clientConnPool, + shared_ptr const& workerIngestProcessor, + http::AuthType const authType) { + HttpCzarIngestCsvModule module(io_service, context, tmpDir, req, resp, contentReader, clientConnPool, + workerIngestProcessor); + string const subModuleName; + module.execute(subModuleName, authType); +} + +HttpCzarIngestCsvModule::HttpCzarIngestCsvModule(asio::io_service& io_service, string const& context, + string const& tmpDir, httplib::Request const& req, + httplib::Response& resp, + httplib::ContentReader const& contentReader, + shared_ptr const& clientConnPool, + shared_ptr const& workerIngestProcessor) + : http::FileUploadModule(cconfig::CzarConfig::instance()->replicationAuthKey(), + cconfig::CzarConfig::instance()->replicationAdminAuthKey(), req, resp, + contentReader), + HttpCzarIngestModuleBase(io_service), + _context(context), + _tmpDir(tmpDir), + _clientConnPool(clientConnPool), + _workerIngestProcessor(workerIngestProcessor) {} + +HttpCzarIngestCsvModule::~HttpCzarIngestCsvModule() { + if (!_csvFileName.empty()) { + boost::system::error_code ec; + fs::remove(_csvFileName, ec); + if (ec.value() != 0) { + warn(__func__, "failed to delete the data file " + _csvFileName + ", error: " + ec.message()); + } + } +} + +string HttpCzarIngestCsvModule::context() const { return _context; } + +void HttpCzarIngestCsvModule::onStartOfFile(string const& name, string const& fileName, + string const& contentType) { + debug(__func__, "name: '" + name + "', fileName: '" + fileName + "', contentType: '" + contentType + "'"); + if (name == "rows") { + if (!_csvFileName.empty()) { + throw http::Error(__func__, "the data file is already uploaded"); + } + boost::system::error_code ec; + fs::path const uniqueFileName = fs::unique_path("http-ingest-%%%%-%%%%-%%%%-%%%%.csv", ec); + if (ec.value() != 0) { + throw http::Error(__func__, "failed to generate a unique file name, error: " + ec.message()); + } + _csvFileName = _tmpDir + "/" + uniqueFileName.string(); + _csvFile.open(_csvFileName, ios::binary); + if (!_csvFile.is_open()) { + throw http::Error(__func__, "failed to open the data file " + _csvFileName + " for writing"); + } + } else if (name == "schema") { + if (!_schema.empty()) { + throw http::Error(__func__, "the schema file is already uploaded"); + } + } else if (name == "indexes") { + if (!_indexes.empty()) { + throw http::Error(__func__, "the indexes file is already uploaded"); + } + } else { + throw http::Error(__func__, "unexpected file name: " + name); + } + _name = name; +} + +void HttpCzarIngestCsvModule::onFileData(char const* data, size_t length) { + debug(__func__, "name: '" + _name + "', length: " + to_string(length)); + if (_name == "rows") { + _csvFile.write(data, length); + } else if (_name == "schema") { + _schema.append(data, length); + } else if (_name == "indexes") { + _indexes.append(data, length); + } else { + throw http::Error(__func__, "unexpected file name: " + _name); + } +} + +void HttpCzarIngestCsvModule::onEndOfFile() { + debug(__func__); + if (_name == "rows") { + _csvFile.close(); + } else if (_name == "schema") { + try { + body().objJson[_name] = json::parse(_schema); + } catch (exception const& ex) { + throw http::Error(__func__, "failed to parse the schema file: " + string(ex.what())); + } + } else if (_name == "indexes") { + try { + body().objJson[_name] = json::parse(_indexes); + } catch (exception const& ex) { + throw http::Error(__func__, "failed to parse the indexes file: " + string(ex.what())); + } + } else { + throw http::Error(__func__, "unexpected file name: " + _name); + } +} + +json HttpCzarIngestCsvModule::onEndOfBody() { + debug(__func__); + checkApiVersion(__func__, 39); + + _databaseName = body().required("database"); + _tableName = body().required("table"); + _charsetName = body().optional("charset_name", "latin1"); + _fieldsTerminatedBy = body().optional("fields_terminated_by", R"(\t)"); + _fieldsEnclosedBy = body().optional("fields_enclosed_by", R"(\0)"); + _fieldsEscapedBy = body().optional("fields_escaped_by", R"(\\)"); + _linesTerminatedBy = body().optional("lines_terminated_by", R"(\n)"); + + setTimeoutSec(max(1U, body().optional("timeout", timeoutSec()))); + + debug(__func__, "database: '" + _databaseName + "'"); + debug(__func__, "table: '" + _tableName + "'"); + debug(__func__, "charsetName: '" + _charsetName + "'"); + debug(__func__, "fields_terminated_by: '" + _fieldsTerminatedBy + "'"); + debug(__func__, "fields_enclosed_by: '" + _fieldsEnclosedBy + "'"); + debug(__func__, "fields_escaped_by: '" + _fieldsEscapedBy + "'"); + debug(__func__, "lines_terminated_by: '" + _linesTerminatedBy + "'"); + debug(__func__, "timeout: " + to_string(timeoutSec())); + debug(__func__, "data file name: '" + _csvFileName + "'"); + + verifyUserDatabaseName(__func__, _databaseName); + verifyUserTableName(__func__, _tableName); + + // TODO: check if the required data file (CSV) was uploaded an saved to disk + if (_csvFileName.empty()) { + throw http::Error(__func__, "data file is missing in the request"); + } + + // Table schema is required to be an array of column descriptors + if (!body().has("schema")) { + throw http::Error(__func__, "table schema definition is missing in the request"); + } + json const& schema = body().objJson.at("schema"); + if (!schema.is_array()) { + throw http::Error(__func__, "table schema found in the request is not the JSON array"); + } + if (schema.empty()) { + throw http::Error(__func__, "table schema in the request is empty"); + } + + // The index definitions are optional and are expected to be an array of index descriptors. + json indexes = json::array(); + if (body().has("indexes")) { + indexes = body().objJson.at("indexes"); + if (!indexes.is_array()) { + throw http::Error(__func__, "index definitions found in the request is not the JSON array"); + } + } + + // Make changes to the persistent state of Qserv and the Replicaton/Ingest system. + list> const warnings = ingestData( + _databaseName, _tableName, schema, indexes, + [&](uint32_t transactionId) -> map { return _pushDataToWorkers(transactionId); }); + for (auto const& warning : warnings) { + warn(warning.first, warning.second); + } + return json(); +} + +map HttpCzarIngestCsvModule::_pushDataToWorkers(uint32_t transactionId) { + list mimeData = {{"transaction_id", to_string(transactionId), "", ""}, + {"table", _tableName, "", ""}, + {"chunk", "0", "", ""}, + {"overlap", "0", "", ""}, + {"charset_name", _charsetName, "", ""}, + {"fields_terminated_by", _fieldsTerminatedBy, "", ""}, + {"fields_enclosed_by", _fieldsEnclosedBy, "", ""}, + {"fields_escaped_by", _fieldsEscapedBy, "", ""}, + {"lines_terminated_by", _linesTerminatedBy, "", ""}, + {"rows", "", _csvFileName, "text/csv"}}; + setProtocolFields(mimeData); + + // Send table data to all eligible workers. + auto const resultQueue = ingest::ResultQueue::create(); + auto const workers = getWorkerIds(); + for (auto const& workerId : workers) { + ingest::Request request( + [&]() -> ingest::Result { + ingest::Result result{workerId, ""}; + try { + auto const req = syncCsvRequestWorker(workerId, mimeData, _clientConnPool); + auto const resp = req->readAsJson(); + if (resp.at("success").get() == 0) { + result.error = "error: " + resp.at("error").get(); + } + } catch (exception const& ex) { + result.error = "ex: " + string(ex.what()); + } + return result; + }, + resultQueue); + _workerIngestProcessor->push(request); + } + // Wait for responses and analyze completion status of each worker request. + // The loop will block until all workers have completed their tasks and results are collected + // or the timeout is reached. + map errors; + for (auto const& workerId : workers) { + auto const result = resultQueue->pop(); + if (!result.error.empty()) errors[workerId] = result.error; + } + return errors; +} + +} // namespace lsst::qserv::czar diff --git a/src/czar/HttpCzarIngestCsvModule.h b/src/czar/HttpCzarIngestCsvModule.h new file mode 100644 index 000000000..0eacd2fdb --- /dev/null +++ b/src/czar/HttpCzarIngestCsvModule.h @@ -0,0 +1,130 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ +#ifndef LSST_QSERV_CZAR_HTTPCZARINGESTCSVMODULE_H +#define LSST_QSERV_CZAR_HTTPCZARINGESTCSVMODULE_H + +// System headers +#include +#include +#include + +// Third party headers +#include "boost/asio.hpp" +#include "nlohmann/json.hpp" + +// Qserv headers +#include "czar/HttpCzarIngestModuleBase.h" +#include "http/FileUploadModule.h" + +// Forward declarations + +namespace lsst::qserv::czar::ingest { +class Processor; +} // namespace lsst::qserv::czar::ingest + +namespace lsst::qserv::http { +class ClientConnPool; +} // namespace lsst::qserv::http + +namespace httplib { +class ContentReader; +class Request; +class Response; +} // namespace httplib + +// This header declarations +namespace lsst::qserv::czar { + +/** + * Class HttpCzarIngestCsvModule implements a handler for processing requests for ingesting + * user-generated data prodicts via the HTTP-based frontend. The requests are expected to + * contain CSV data, JSON schema and the relevant parameters in the multipart/form-data body of + * the request. + */ +class HttpCzarIngestCsvModule : public http::FileUploadModule, public HttpCzarIngestModuleBase { +public: + static void process(boost::asio::io_service& io_service, std::string const& context, + std::string const& tmpDir, httplib::Request const& req, httplib::Response& resp, + httplib::ContentReader const& contentReader, + std::shared_ptr const& clientConnPool, + std::shared_ptr const& workerIngestProcessor, + http::AuthType const authType = http::AuthType::NONE); + + HttpCzarIngestCsvModule() = delete; + HttpCzarIngestCsvModule(HttpCzarIngestCsvModule const&) = delete; + HttpCzarIngestCsvModule& operator=(HttpCzarIngestCsvModule const&) = delete; + + /// Destructor is responsible for cleaning up the temporary files. + virtual ~HttpCzarIngestCsvModule(); + +protected: + virtual std::string context() const final; + virtual void onStartOfFile(std::string const& name, std::string const& fileName, + std::string const& contentType) final; + virtual void onFileData(char const* data, std::size_t length) final; + virtual void onEndOfFile() final; + virtual nlohmann::json onEndOfBody() final; + +private: + HttpCzarIngestCsvModule(boost::asio::io_service& io_service, std::string const& context, + std::string const& tmpDir, httplib::Request const& req, httplib::Response& resp, + httplib::ContentReader const& contentReader, + std::shared_ptr const& clientConnPool, + std::shared_ptr const& workerIngestProcessor); + + /// Ingest the table data into the Qserv and the Replication/Ingest system in a conetxt of + /// the given transaction. + std::map _pushDataToWorkers(std::uint32_t transactionId); + + // Input parameters + + /// The context string for posting messages into the logging stream. + std::string const _context; + + /// The temporary directory for storing the uploaded files. + std::string const _tmpDir; + + /// The HTTP connection pool for communications with workers. + std::shared_ptr const _clientConnPool; + + /// The ingest processor for uploading data to workers. + std::shared_ptr const _workerIngestProcessor; + + // The following parameters are used to store the uploaded files. + std::string _name; ///< The name of a file entry that is open ("rows", "schema" or "indexes"). + std::string _csvFileName; ///< The name of the CSV file in the temporary directory. + std::ofstream _csvFile; ///< The output stream for the CSV file. + std::string _schema; ///< The schema payload before parsing it into the JSON object. + std::string _indexes; ///< The indexes payload before parsing it into the JSON object. + + // The following parameters are parsed from the request body. + std::string _databaseName; + std::string _tableName; + std::string _charsetName; + std::string _fieldsTerminatedBy; + std::string _fieldsEnclosedBy; + std::string _fieldsEscapedBy; + std::string _linesTerminatedBy; +}; + +} // namespace lsst::qserv::czar + +#endif // LSST_QSERV_CZAR_HTTPCZARINGESTCSVMODULE_H diff --git a/src/czar/HttpCzarIngestModuleBase.cc b/src/czar/HttpCzarIngestModuleBase.cc index d9fc6d917..b68d566b8 100644 --- a/src/czar/HttpCzarIngestModuleBase.cc +++ b/src/czar/HttpCzarIngestModuleBase.cc @@ -35,6 +35,9 @@ #include "cconfig/CzarConfig.h" #include "http/AsyncReq.h" #include "http/BinaryEncoding.h" +#include "http/Client.h" +#include "http/ClientConfig.h" +#include "http/ClientConnPool.h" #include "http/Exceptions.h" #include "http/MetaModule.h" #include "http/RequestBodyJSON.h" @@ -340,6 +343,16 @@ shared_ptr HttpCzarIngestModuleBase::_asyncPostRequest(string co return request; } +shared_ptr HttpCzarIngestModuleBase::_syncMimePostRequest( + string const& url, list const& mimeData, + shared_ptr const& connPool) { + vector const headers; + http::ClientConfig clientConfig; + clientConfig.connectTimeout = _timeoutSec; + clientConfig.timeout = _timeoutSec; + return make_shared(url, mimeData, headers, clientConfig, connPool); +} + void HttpCzarIngestModuleBase::setProtocolFields(json& data) const { data["version"] = http::MetaModule::version; data["instance_id"] = cconfig::CzarConfig::instance()->replicationInstanceId(); @@ -347,4 +360,15 @@ void HttpCzarIngestModuleBase::setProtocolFields(json& data) const { data["admin_auth_key"] = cconfig::CzarConfig::instance()->replicationAdminAuthKey(); } +void HttpCzarIngestModuleBase::setProtocolFields(list& mimeData) const { + // IMPORTANT: The order of the fields is important in the MIMEPOST request. Non-file + // fields should be placed before the file field. The collection that is being ammeded + // by this method may already contain some fields, including the file fields. + mimeData.push_front({"version", to_string(http::MetaModule::version), "", ""}); + mimeData.push_front({"instance_id", cconfig::CzarConfig::instance()->replicationInstanceId(), "", ""}); + mimeData.push_front({"auth_key", cconfig::CzarConfig::instance()->replicationAuthKey(), "", ""}); + mimeData.push_front( + {"admin_auth_key", cconfig::CzarConfig::instance()->replicationAdminAuthKey(), "", ""}); +} + } // namespace lsst::qserv::czar diff --git a/src/czar/HttpCzarIngestModuleBase.h b/src/czar/HttpCzarIngestModuleBase.h index 863a0b03f..6faac7105 100644 --- a/src/czar/HttpCzarIngestModuleBase.h +++ b/src/czar/HttpCzarIngestModuleBase.h @@ -40,6 +40,9 @@ namespace lsst::qserv::http { class AsyncReq; +class Client; +class ClientConnPool; +class ClientMimeEntry; } // namespace lsst::qserv::http // This header declarations @@ -136,12 +139,34 @@ class HttpCzarIngestModuleBase { return _asyncPostRequest(_worker(workerId) + "/ingest/data", data); } + /** + * Create a synchronous MIMEPOST request to the specified Replication Worker. + * @note The request won't be started. It's up to a caller to do so. The duration of + * the request is limited by the optional timeout attribute set by calling the method + * setTimeoutSec(). + * @param workerId The worker's identifier. + * @param mimeData The collection of the mime descriptors to be sent with the request. + * @param connPool The optional connection pool. + * @return std::shared_ptr A pointer to the request object. + */ + std::shared_ptr syncCsvRequestWorker( + std::string const& workerId, std::list const& mimeData, + std::shared_ptr const& connPool = nullptr) { + return _syncMimePostRequest(_worker(workerId) + "/ingest/csv", mimeData, connPool); + } + /** * Set the protocol fields in the JSON object. * @param data The JSON object to be updated. */ void setProtocolFields(nlohmann::json& data) const; + /** + * Set the protocol fields in a collection of the mime descriptors. + * @param mimeData The collection of the descriptors to be updated. + */ + void setProtocolFields(std::list& mimeData) const; + private: // The following methods are used to interact with the Replication Controller. // The methods throw http::Error or other exceptions in case of communication @@ -257,6 +282,19 @@ class HttpCzarIngestModuleBase { */ std::shared_ptr _asyncPostRequest(std::string const& url, std::string const& data); + /** + * Create a synchronous MIMEPOST request to the server. + * @note The request won't be started. It's up to a caller to do so. + * @param url A complete URL for the REST service to be called. + * @param mimeData The collection of the mime descriptors to be sent with the request. + * @param connPool The optional connection pool. + * @return std::shared_ptr A pointer to the request object. + * @throw http::Error for specific errors reported by the client library. + */ + std::shared_ptr _syncMimePostRequest( + std::string const& url, std::list const& mimeData, + std::shared_ptr const& connPool = nullptr); + /// I/O service for async TCP communications. boost::asio::io_service& _io_service; diff --git a/src/czar/HttpCzarSvc.cc b/src/czar/HttpCzarSvc.cc index 026855a54..949dda54a 100644 --- a/src/czar/HttpCzarSvc.cc +++ b/src/czar/HttpCzarSvc.cc @@ -33,8 +33,11 @@ // Qserv headers #include "cconfig/CzarConfig.h" +#include "czar/HttpCzarIngestCsvModule.h" #include "czar/HttpCzarIngestModule.h" #include "czar/HttpCzarQueryModule.h" +#include "czar/WorkerIngestProcessor.h" +#include "http/ClientConnPool.h" #include "http/ChttpMetaModule.h" // LSST headers @@ -58,17 +61,26 @@ void throwIf(bool condition, string const& message) { namespace lsst::qserv::czar { -shared_ptr HttpCzarSvc::create(int port, unsigned int numThreads, string const& sslCertFile, - string const& sslPrivateKeyFile) { - return shared_ptr(new HttpCzarSvc(port, numThreads, sslCertFile, sslPrivateKeyFile)); +shared_ptr HttpCzarSvc::create(int port, unsigned int numThreads, + unsigned int numWorkerIngestThreads, string const& sslCertFile, + string const& sslPrivateKeyFile, string const& tmpDir, + unsigned int clientConnPoolSize) { + return shared_ptr(new HttpCzarSvc(port, numThreads, numWorkerIngestThreads, sslCertFile, + sslPrivateKeyFile, tmpDir, clientConnPoolSize)); } -HttpCzarSvc::HttpCzarSvc(int port, unsigned int numThreads, string const& sslCertFile, - string const& sslPrivateKeyFile) +HttpCzarSvc::HttpCzarSvc(int port, unsigned int numThreads, unsigned int numWorkerIngestThreads, + string const& sslCertFile, string const& sslPrivateKeyFile, string const& tmpDir, + unsigned int clientConnPoolSize) : _port(port), _numThreads(numThreads), + _numWorkerIngestThreads(numWorkerIngestThreads), _sslCertFile(sslCertFile), - _sslPrivateKeyFile(sslPrivateKeyFile) { + _sslPrivateKeyFile(sslPrivateKeyFile), + _tmpDir(tmpDir), + _clientConnPool(make_shared(clientConnPoolSize)), + _workerIngestProcessor(ingest::Processor::create( + numWorkerIngestThreads ? numWorkerIngestThreads : thread::hardware_concurrency())) { _createAndConfigure(); } @@ -103,7 +115,10 @@ void HttpCzarSvc::_createAndConfigure() { _svr = make_unique(_sslCertFile.data(), _sslPrivateKeyFile.data()); ::throwIf(!_svr->is_valid(), context + "Failed to create the server"); - _svr->new_task_queue = [&] { return new httplib::ThreadPool(_numThreads, _maxQueuedRequests); }; + _svr->new_task_queue = [&] { + return new httplib::ThreadPool(_numThreads ? _numThreads : thread::hardware_concurrency(), + _maxQueuedRequests); + }; if (_port == 0) { _port = _svr->bind_to_any_port(_bindAddr, _port); ::throwIf(_port < 0, context + "Failed to bind the server to any port"); @@ -141,6 +156,11 @@ void HttpCzarSvc::_registerHandlers() { _svr->Get("/query-async/result/:qid", [self](httplib::Request const& req, httplib::Response& resp) { HttpCzarQueryModule::process(::serviceName, req, resp, "RESULT"); }); + _svr->Post("/ingest/csv", [self](httplib::Request const& req, httplib::Response& resp, + httplib::ContentReader const& contentReader) { + HttpCzarIngestCsvModule::process(self->_io_service, ::serviceName, self->_tmpDir, req, resp, + contentReader, self->_clientConnPool, self->_workerIngestProcessor); + }); _svr->Post("/ingest/data", [self](httplib::Request const& req, httplib::Response& resp) { HttpCzarIngestModule::process(self->_io_service, ::serviceName, req, resp, "INGEST-DATA"); }); diff --git a/src/czar/HttpCzarSvc.h b/src/czar/HttpCzarSvc.h index 6106d458e..db5bec569 100644 --- a/src/czar/HttpCzarSvc.h +++ b/src/czar/HttpCzarSvc.h @@ -31,6 +31,14 @@ #include "boost/asio.hpp" // Forward declarations +namespace lsst::qserv::czar::ingest { +class Processor; +} // namespace lsst::qserv::czar::ingest + +namespace lsst::qserv::http { +class ClientConnPool; +} // namespace lsst::qserv::http + namespace httplib { class SSLServer; } // namespace httplib @@ -48,35 +56,41 @@ namespace lsst::qserv::czar { class HttpCzarSvc : public std::enable_shared_from_this { public: static std::shared_ptr create(int port, unsigned int numThreads, + unsigned int numWorkerIngestThreads, std::string const& sslCertFile, - std::string const& sslPrivateKeyFile); + std::string const& sslPrivateKeyFile, + std::string const& tmpDir, + unsigned int clientConnPoolSize = 0); int port() const { return _port; } void startAndWait(); private: - HttpCzarSvc(int port, unsigned int numThreads, std::string const& sslCertFile, - std::string const& sslPrivateKeyFile); + HttpCzarSvc(int port, unsigned int numThreads, unsigned int numWorkerIngestThreads, + std::string const& sslCertFile, std::string const& sslPrivateKeyFile, + std::string const& tmpDir, unsigned int clientConnPoolSize = 0); void _createAndConfigure(); void _registerHandlers(); int _port; unsigned int const _numThreads; + unsigned int const _numWorkerIngestThreads; std::string const _sslCertFile; std::string const _sslPrivateKeyFile; + std::string const _tmpDir; std::size_t const _maxQueuedRequests = 0; // 0 means unlimited std::string const _bindAddr = "0.0.0.0"; std::unique_ptr _svr; - // The BOOST ASIO I/O services and a thread pool for async communication with - // the Replication Controller and workers. - // TODO: Consider a configuration option for setting the desired number - // of threads in the pool. - - unsigned int const _numBoostAsioThreads = 2; + /// The BOOST ASIO I/O services and a thread pool for async communication with + /// the Replication Controller and workers. + unsigned int const _numBoostAsioThreads = std::thread::hardware_concurrency(); std::unique_ptr _work; boost::asio::io_service _io_service; std::vector> _threads; + + std::shared_ptr const _clientConnPool; + std::shared_ptr const _workerIngestProcessor; }; } // namespace lsst::qserv::czar diff --git a/src/czar/WorkerIngestProcessor.cc b/src/czar/WorkerIngestProcessor.cc new file mode 100644 index 000000000..a7a9d0a56 --- /dev/null +++ b/src/czar/WorkerIngestProcessor.cc @@ -0,0 +1,50 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + +// Class header +#include "czar/WorkerIngestProcessor.h" + +using namespace std; + +namespace lsst::qserv::czar::ingest { + +Request::Request(function const& processor, shared_ptr resultQueue) + : _processor(processor), _resultQueue(resultQueue) {} + +void Request::process() { _resultQueue->push(_processor()); } + +shared_ptr Processor::create(unsigned int numThreads) { + return shared_ptr(new Processor(numThreads)); +} + +void Processor::push(Request const& req) { _requestQueue->push(req); } + +Processor::Processor(unsigned int numThreads) : _requestQueue(RequestQueue::create()) { + for (unsigned int i = 0; i < numThreads; ++i) { + _threads.push_back(thread([this]() { + while (true) { + _requestQueue->pop().process(); + } + })); + } +} + +} // namespace lsst::qserv::czar::ingest diff --git a/src/czar/WorkerIngestProcessor.h b/src/czar/WorkerIngestProcessor.h new file mode 100644 index 000000000..ae073993c --- /dev/null +++ b/src/czar/WorkerIngestProcessor.h @@ -0,0 +1,115 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ +#ifndef LSST_QSERV_CZAR_WORKERINGESTPROCESSOR_H +#define LSST_QSERV_CZAR_WORKERINGESTPROCESSOR_H + +// System headers +#include +#include +#include +#include +#include +#include +#include +#include + +// This header declarations +namespace lsst::qserv::czar::ingest { + +/** + * The synchronized queue class. The class is used for storing the worker ingest requests + * and results. + */ +template +class Queue : public std::enable_shared_from_this> { +public: + static std::shared_ptr> create() { + return std::shared_ptr>(new Queue()); + } + Queue(Queue const&) = delete; + Queue& operator=(Queue const&) = delete; + void push(Entry const& entry) { + std::lock_guard lock(_mtx); + _entries.push_back(entry); + _cv.notify_one(); + } + Entry pop() { + std::unique_lock lock(_mtx); + _cv.wait(lock, [this]() { return !_entries.empty(); }); + Entry const result = _entries.front(); + _entries.pop_front(); + return result; + } + +private: + Queue() = default; + std::mutex _mtx; + std::condition_variable _cv; + std::list _entries; +}; + +/** + * A structure for storing the worker ingest result. + */ +struct Result { + std::string worker; + std::string error; +}; +using ResultQueue = Queue; + +/** + * A class for storing the worker ingest request. The class is used for storing the request + * processing function and the result queue where the function result is stored. + * @note The function should not throw exceptions. + */ +class Request { +public: + Request() = default; + Request(Request const&) = default; + Request(std::function const& processor, std::shared_ptr resultQueue); + void process(); + +private: + std::function const _processor; + std::shared_ptr const _resultQueue; +}; +using RequestQueue = Queue; + +/** + * A class for processing the worker ingest requests. The class is used for processing the + * worker ingest requests in parallel by a number of threads. + */ +class Processor : public std::enable_shared_from_this { +public: + static std::shared_ptr create(unsigned int numThreads); + Processor(Processor const&) = delete; + Processor& operator=(Processor const&) = delete; + void push(Request const& req); + +private: + Processor(unsigned int numThreads); + std::vector _threads; + std::shared_ptr _requestQueue; +}; + +} // namespace lsst::qserv::czar::ingest + +#endif // LSST_QSERV_CZAR_WORKERINGESTPROCESSOR_H diff --git a/src/czar/qserv-czar-http.cc b/src/czar/qserv-czar-http.cc index 85a0c451d..f3a73fa9e 100644 --- a/src/czar/qserv-czar-http.cc +++ b/src/czar/qserv-czar-http.cc @@ -28,63 +28,110 @@ #include #include #include +#include + +// Third party headers +#include "boost/program_options.hpp" // Qserv headers #include "czar/Czar.h" #include "czar/HttpCzarSvc.h" -#include "global/stringUtil.h" // for qserv::stoui using namespace std; +namespace po = boost::program_options; namespace czar = lsst::qserv::czar; namespace qserv = lsst::qserv; namespace { - -string const usage = "Usage: "; - +char const* const help = "The HTTP-based Czar frontend."; +char const* const context = "[CZAR-HTTP-FRONTEND]"; } // namespace int main(int argc, char* argv[]) { - // Parse command-line parameters to get: - // - the name of Czar - // - a path to the configuration files - // - the port number (0 value would result in allocating the first available port) - // - the number of service threads (0 value would assume the number of host machine's - // hardware threads) - // - a location of the SSL/TSL certificate for the secure connections - // - a location of the SSL/TSL private key - if (argc != 7) { - cerr << __func__ << ": insufficient number of the command-line parameters\n" << ::usage << endl; - return 1; + po::options_description desc("", 120); + desc.add_options()("help,h", "Print this help message and exit."); + desc.add_options()("verbose,v", "Produce verbose output."); + desc.add_options()("czar-name", po::value()->default_value("http"), + "The name of this Czar frontend. Assign a unique name to each Czar."); + desc.add_options()("config", po::value()->default_value("/config-etc/qserv-czar.cnf"), + "The configuration file."); + desc.add_options()("port", po::value()->default_value(4048), + "HTTP/HTTPS port of the REST API."); + desc.add_options()("threads", po::value()->default_value(thread::hardware_concurrency()), + "The number of the request processing threads in the REST service." + " The default value is the number of hardware threads. Zero value is not allowed."); + desc.add_options()("worker-ingest-threads", + po::value()->default_value(thread::hardware_concurrency()), + "A size of a thread pool for pushing table contributions to workers over" + " the synchronous HTTP/HTTPS protocol. The default value is the number of" + " hardware threads. Zero value is not allowed."); + desc.add_options()("ssl-cert-file", po::value()->default_value("/config-etc/ssl/czar-cert.pem"), + "The SSL/TSL certificate file."); + desc.add_options()("ssl-private-key-file", + po::value()->default_value("/config-etc/ssl/czar-key.pem"), + "The SSL/TSL private key file."); + desc.add_options()("tmp-dir", po::value()->default_value("/tmp"), + "The temporary directory for the service."); + desc.add_options()("conn-pool-size", po::value()->default_value(0), + "A size of a connection pool for synchronous communications over the HTTP" + " protocol with the Qserv Worker Ingest servbers. The default value is 0," + " which assumes that the pool size is determined by an implementation of" + " the underlying library 'libcurl'. The number of connectons in a production" + " Qserv deployment should be at least the number of workers in the deployment."); + + po::variables_map vm; + po::store(po::parse_command_line(argc, const_cast(argv), desc), vm); + po::notify(vm); + + string const czarName = vm["czar-name"].as(); + string const configFilePath = vm["config"].as(); + uint16_t const port = vm["port"].as(); + + unsigned int const numThreads = vm["threads"].as(); + if (numThreads == 0) { + throw invalid_argument( + "The number of threads in command line option '--threads'" + " must be greater than zero"); } - int nextArg = 1; - string const czarName = argv[nextArg++]; - string const configFilePath = argv[nextArg++]; - uint16_t port = 0; - unsigned int numThreads = 0; - try { - int const portParsed = stoi(argv[nextArg++]); - if (portParsed < 0 || portParsed > numeric_limits::max()) { - cerr << __func__ << ": the port number is not valid\n" << ::usage << endl; - return 1; - } - port = static_cast(portParsed); - numThreads = qserv::stoui(argv[nextArg++]); - if (numThreads == 0) numThreads = thread::hardware_concurrency(); - } catch (exception const& ex) { - cerr << __func__ << ": failed to parse command line parameters\n" << ::usage << endl; - return 1; + + unsigned int const numWorkerIngestThreads = vm["worker-ingest-threads"].as(); + if (numWorkerIngestThreads == 0) { + throw invalid_argument( + "The number of threads in command line option '--worker-ingest-threads'" + " must be greater than zero"); + } + string const sslCertFile = vm["ssl-cert-file"].as(); + string const sslPrivateKeyFile = vm["ssl-private-key-file"].as(); + string const tmpDir = vm["tmp-dir"].as(); + unsigned int connPoolSize = vm["conn-pool-size"].as(); + + if (vm.count("help")) { + cout << argv[0] << " [options]\n\n" << ::help << "\n\n" << desc << endl; + return 0; + } + bool const verbose = vm.count("verbose") > 0; + if (verbose) { + cout << ::context << " Czar name: " << czarName << "\n" + << ::context << " Configuration file: " << configFilePath << "\n" + << ::context << " Port: " << port << "\n" + << ::context << " Number of threads: " << numThreads << "\n" + << ::context << " Number of worker ingest threads: " << numWorkerIngestThreads << "\n" + << ::context << " SSL certificate file: " << sslCertFile << "\n" + << ::context << " SSL private key file: " << sslPrivateKeyFile << "\n" + << ::context << " Temporary directory: " << tmpDir << "\n" + << ::context << " Connection pool size: " << connPoolSize << endl; } - string const sslCertFile = argv[nextArg++]; - string const sslPrivateKeyFile = argv[nextArg++]; try { auto const czar = czar::Czar::createCzar(configFilePath, czarName); - auto const svc = czar::HttpCzarSvc::create(port, numThreads, sslCertFile, sslPrivateKeyFile); - cout << __func__ << ": HTTP-based query processing service of Czar bound to port: " << svc->port() - << endl; + auto const svc = czar::HttpCzarSvc::create(port, numThreads, numWorkerIngestThreads, sslCertFile, + sslPrivateKeyFile, tmpDir, connPoolSize); + if (verbose) { + cout << ::context << " The query processing service of Czar bound to port: " << svc->port() + << endl; + } svc->startAndWait(); } catch (exception const& ex) { - cerr << __func__ << ": the application failed, exception: " << ex.what() << endl; + cerr << ::context << " The application failed, exception: " << ex.what() << endl; return 1; } return 0; From 0d6e30c699d2dcbe7563c09e6c3fd2bc4a1c016a Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Wed, 6 Nov 2024 16:40:25 -0800 Subject: [PATCH 6/6] Extended documentation on the HTTP frontend to cover the new service --- doc/admin/data-table-indexes.rst | 14 +- doc/ingest/api/advanced/directors.rst | 2 +- doc/ingest/api/advanced/transactions.rst | 1 - doc/ingest/api/concepts/overview.rst | 8 +- doc/ingest/api/concepts/transactions.rst | 19 +- doc/ingest/api/index.rst | 2 +- doc/user/http-frontend-general.rst | 10 +- doc/user/http-frontend-ingest.rst | 429 +++++++++++++++++------ doc/user/http-frontend-query.rst | 6 +- doc/user/http-frontend.rst | 4 +- 10 files changed, 356 insertions(+), 139 deletions(-) diff --git a/doc/admin/data-table-indexes.rst b/doc/admin/data-table-indexes.rst index b6108d206..e8f74075c 100644 --- a/doc/admin/data-table-indexes.rst +++ b/doc/admin/data-table-indexes.rst @@ -247,7 +247,7 @@ Where: The required *base* name of the table where the index will be created. ``overlap`` : *number* := ``0`` - The optional *overlap* flagg indicating a sub-type of the *chunk* table. The value should be one of the following: + The optional *overlap* flag indicating a sub-type of the *chunk* table. The value should be one of the following: - ``1`` : *full overlap* - ``0`` : *chunk* @@ -301,9 +301,9 @@ Where: types (numeric, etc.). Otherwise, an index creation request will fail. ``ascending`` : *number* - The required sorting order of the column in the index. It translates into ``ASC`` or ``DESC`` optiona - in the key definition in ``key_part``. A value of ``0`` will be interpreted as ``DESC``. Any other positive number - will be imterpreted as to ``ASC``. + The required sorting order of the column in the index. It translates into ``ASC`` or ``DESC`` options + in the key definition in ``key_part``. A value of ``0`` will be interpreted as ``DESC``. + Any other positive number will be imterpreted as to ``ASC``. ``auth_key`` : *string* The required zauthorization key. @@ -388,7 +388,7 @@ Where: The required *base* name of the table where the index will be created. ``overlap`` : *number* := ``0`` - The optional *overlap* flagg indicating a sub-type of the *chunk* table. The value should be one of the following: + The optional *overlap* flag indicating a sub-type of the *chunk* table. The value should be one of the following: - ``1`` : *full overlap* - ``0`` : *chunk* @@ -439,10 +439,10 @@ Where the service path has the following parameters: ``table`` : *string* The name of the table for which the indexes are required to be collected. -The optional query parameyter is +The optional query parameter is ``overlap`` : *number* := ``0`` - The optional *overlap* flagg indicating a sub-type of the *chunk* table. The value should be one of the following: + The optional *overlap* flag indicating a sub-type of the *chunk* table. The value should be one of the following: - ``1`` : *full overlap* - ``0`` : *chunk* diff --git a/doc/ingest/api/advanced/directors.rst b/doc/ingest/api/advanced/directors.rst index 0e0efc82b..92b5c5273 100644 --- a/doc/ingest/api/advanced/directors.rst +++ b/doc/ingest/api/advanced/directors.rst @@ -87,7 +87,7 @@ The following table illustrates how JSON configurations for all the above-mentio | "longitude_key" : "dec" | | | } | **Note**: Attributes ``latitude_key`` and | | | ``longitude_key`` were not provided. | - | | is allowed for the dependent tables. | + | | This is allowed for the dependent tables. | | | | | | .. code-block:: json | | | | diff --git a/doc/ingest/api/advanced/transactions.rst b/doc/ingest/api/advanced/transactions.rst index 07291839c..67768a1d8 100644 --- a/doc/ingest/api/advanced/transactions.rst +++ b/doc/ingest/api/advanced/transactions.rst @@ -119,7 +119,6 @@ Things to consider: factor limiting the performance of the workflow. - Any failure to ingest a contribution will result in aborting the entire transaction. This can significantly impact the workflow's performance, especially if the amount of data to be ingested is large. - impact the workflow's performance, especially if the amount of data to be ingested is large. Best use: diff --git a/doc/ingest/api/concepts/overview.rst b/doc/ingest/api/concepts/overview.rst index 35b94d31d..b1b39d28b 100644 --- a/doc/ingest/api/concepts/overview.rst +++ b/doc/ingest/api/concepts/overview.rst @@ -69,8 +69,8 @@ it should be registered using: - :ref:`ingest-db-table-management-register-db` (REST) -The newely registered database will be always in the *unpublished* state. If the database already exists in -the Replication/Ingest and it's in the *published* state it should be *unpublished* state using: +The newly registered database will be always in the *unpublished* state. If the database already exists in +the Replication/Ingest and it's in the *published* state it should be put into the *unpublished* state using: - :ref:`ingest-db-table-management-unpublish-db` (REST) @@ -159,8 +159,8 @@ The REST API for initiating the contribuiton requests is covered in the followin Monitor the progress of the ingest activities ---------------------------------------------- -The workflow should always be avare about the progress of the ingest activities, and about the status of the -contribution requests. This is need for (at least) three reasons: +The workflow should always be aware about the progress of the ingest activities, and about the status of the +contribution requests. This is needed for (at least) three reasons: #. To know when the ingest activities are finished #. To know when the ingest activities (and which requests) are failed diff --git a/doc/ingest/api/concepts/transactions.rst b/doc/ingest/api/concepts/transactions.rst index b736824e3..bdc9c4b3b 100644 --- a/doc/ingest/api/concepts/transactions.rst +++ b/doc/ingest/api/concepts/transactions.rst @@ -3,14 +3,13 @@ Transactions ============ -The distributed transaction mechanism is one of the key technologies that was -implemented in the Qserv Ingest system to allow for the incremental updates of the overall state of the data and metadata -while ensuring the consistency of the ingested catalogs. Transactions also play an important role in allowing -the high-performance ingest activities to be performed in a distributed environment. Transactions if used correct may -significantly increase the level of parallelism of the ingest workflows. Transactions are not visible to end users. +The distributed transaction mechanism is a crucial technology in the Qserv Ingest system. It enables incremental updates +to the data and metadata while maintaining the consistency of ingested catalogs. Transactions also facilitate high-performance +ingest activities in a distributed environment. When used correctly, transactions can significantly enhance the parallelism +of ingest workflows. Note that transactions are not visible to end users. Transactions are open in a scope of a database. It's a responsibility of the workflows to manage transactions as needed -for the ingest activities uisng the following REST services: +for the ingest activities using the following REST services: - :ref:`ingest-trans-management` (REST) @@ -68,10 +67,10 @@ to Qserv users and is queriable. Here is an illustration of a query and the corr Checkpointing ------------- -Transactions also provide the checkpointing mechanism that allows rolling back to a prior consistent state of the affected tables -should any problem occur during the ingest activities. Transactions may spans across many workers and tables located -at the workers. It's up to the workflow to decide what contrubutions to ingest and in what order to ingest those in -a scope of each transaction. +Transactions also provide a checkpointing mechanism that allows rolling back to a prior consistent state of the affected tables +should any problem occur during the ingest activities. Transactions may span across many workers and tables located +at the workers. It's up to the workflow to decide what contributions to ingest and in what order to ingest those in +the scope of each transaction. The following diagram illustrates the concept of the transactions in Qserv. There are 3 transactions that are started and commited sequentially (in the real life scenarios the transactions can be and should be started and commited in parallel, diff --git a/doc/ingest/api/index.rst b/doc/ingest/api/index.rst index 99353ca24..f5ceef389 100644 --- a/doc/ingest/api/index.rst +++ b/doc/ingest/api/index.rst @@ -1,7 +1,7 @@ .. note:: - Information in this guide corresponds to the version **38** of the Qserv REST API. Keep in mind + Information in this guide corresponds to the version **39** of the Qserv REST API. Keep in mind that each implementation of the API has a specific version. The version number will change if any changes to the implementation or the API that might affect users will be made. The current document will be kept updated to reflect the latest version of the API. diff --git a/doc/user/http-frontend-general.rst b/doc/user/http-frontend-general.rst index 990dfc4bb..72bdde6ae 100644 --- a/doc/user/http-frontend-general.rst +++ b/doc/user/http-frontend-general.rst @@ -124,7 +124,7 @@ Example: "name" : "http", "id" : 8, "instance_id" : "qserv-prod", - "version" : 38, + "version" : 39, "success" : 1 } @@ -137,7 +137,7 @@ the following request for checking the status of an ongoing query might look lik .. code-block:: bash - curl -k 'https://localhost:4041/query-async/status/1234?version=38' -X GET + curl -k 'https://localhost:4041/query-async/status/1234?version=39' -X GET For other HTTP methods used by the API, the version number must be included in the body of the request as shown below: @@ -145,11 +145,11 @@ For other HTTP methods used by the API, the version number must be included in t curl -k 'https://localhost:4041/query-async' -X POST \ -H 'Content-Type: application/json' \ - -d'{"version":38,"query":"SELECT ..."}' + -d'{"version":39,"query":"SELECT ..."}' If the number does not match expectations, such a request will fail and the service will return the following response. Here is an example of what will happen if the wrong version number ``29`` is specified instead -of ``38`` (as specified in the example above): +of ``39`` (as specified in the example above): .. code-block:: json @@ -157,7 +157,7 @@ of ``38`` (as specified in the example above): "error" : "The requested version 29 of the API is not in the range supported by the service.", "error_ext" : { - "max_version" : 38, + "max_version" : 39, "min_version" : 30 }, "warning" : "" diff --git a/doc/user/http-frontend-ingest.rst b/doc/user/http-frontend-ingest.rst index c5f64e845..96911a5fa 100644 --- a/doc/user/http-frontend-ingest.rst +++ b/doc/user/http-frontend-ingest.rst @@ -6,37 +6,89 @@ The User Table Ingest Interface The frontend provides a simple interface for ingesting and managing user-defined tables in Qserv. Key features and limitations include: - Supports creation and ingestion of simple (non-partitioned) tables only. -- Table sizes are constrained by the frontend's available memory: +- Two options for ingesting table data are supported: - - Practical limit is a few GB. - - Limit may be lower for multiple simultaneous table ingest requests. + - :ref:`http-frontend-ingest-json`: Both data and metadata are encapsulated into a single JSON object sent in the request body. + - :ref:`http-frontend-ingest-multipart`: The data is sent as a ``CSV``-formatted file, and metadata + is encapsulated in separate JSON objects. All this information is packaged into the body of a request. -- Only synchronous interface is currently supported. + Each option has its own pros and cons. The choice depends on the specific requirements of the client application. + The ``application/json`` option is more flexible and allows for more control over the ingest process. + However, it may be less efficient for large tables compared to the ``multipart/form-data`` option. + Additionally, with the ``application/json`` option, table sizes are constrained by the frontend's available memory, where: - - **Asynchronous interface is under evaluation.** + - Practical limit is a few GB. + - Limit may be lower for multiple simultaneous table ingest requests. -- Schema definition and table data are sent to the frontend in a single JSON object. +- Schema and index definitions are sent to the frontend in the JSON format. - Clients can request optional indexes on ingested tables during the ingest process. +- Only *synchronous* interface is currently supported. - User-defined tables are automatically created by the request processor within the user databases. - The service enforces a specific naming convention for user databases to avoid conflicts with production data products in Qserv. - - The naming convention is detailed in the relevant section of the document. + - The naming convention is detailed in the following section of the document: + + - :ref:`http-frontend-ingest-names` - Users can delete tables or their corresponding databases. - **Currently, the service does not support authentication/authorization or user quota control.** - No backups are provided for tables ingested using this mechanism. -- Binary data types are supported. +- Binary data types are supported in both ingest options. However, there are differences in how binary data is handled: + + - When the table data is sent in a ``multipart/form-data``-formatted request body, the binary data needs to be encoded + as explained in the MySQL documentation: + + - https://dev.mysql.com/doc/refman/8.4/en/load-data.html + + - When the table data is sent in an ``application/json``-formatted request body, the binary data must be converted to + an appropriate format for inclusion in the JSON request object. This is explained in: - - **Binary data must be converted to an appropriate format for inclusion in the JSON request object.** - - See: :ref:`ingest-general-binary-encoding` (REST) + - See: :ref:`ingest-general-binary-encoding` (REST) - The API follows the same :ref:`http-frontend-general-error-reporting` scheme as adopted for :ref:`http-frontend-query`. +.. _http-frontend-ingest-names: + +Naming conventions +------------------ + +Most services described in this document require user database and table names. These names must follow the conventions outlined below: + +``database`` : *string* + The name of a user database. The names of the databases must start with the following prefix: + + .. code-block:: + + user_ + + The rest of the name should include the name of a user or a role. For example: + + .. code-block:: + + user_gapon + +``table`` : *string* + The name of a table. Table names may not start with the following prefix: + + .. code-block:: + + qserv_ + + This prefix is reserved for naming internal tables that Qserv places into user databases. + +A failure to follow these conventions will result in an error response from the service. + + Ingesting tables ---------------- +.. _http-frontend-ingest-json: + +application/json +^^^^^^^^^^^^^^^^ + The following REST service implements the synchronous interface for ingesting a table into Qserv: .. list-table:: @@ -46,7 +98,8 @@ The following REST service implements the synchronous interface for ingesting a * - ``POST`` - ``/ingest/data`` -A client must include a JSON object in the request body to specify the operation to be performed. The object follows this schema: +The service requires a ``application/json``-formatted request body. The body must include a single JSON object +to specify the operation to be performed. The object follows this schema: .. code-block:: @@ -62,26 +115,10 @@ A client must include a JSON object in the request body to specify the operation Where: ``database`` : *string* - The required name of a user database. The names of the databases must start with the following prefix: - - .. code-block:: - - user_ - - The rest of the name should include the name of a user or a role. For example: - - .. code-block:: - - user_gapon + The required name of a user database. ``table`` : *string* - The required name of a table. Table names may not start with the following prefix: - - .. code-block:: - - qserv_ - - This prefix is reserved for naming internal tables that Qserv places into user databases. + The required name of a table. ``binary_encoding`` : *string* = ``hex`` The optional binary encoding of the binary data in the table. For further details see: @@ -89,28 +126,16 @@ Where: - :ref:`ingest-general-binary-encoding` (REST) ``schema`` : *array* - The required schema definition. Each element of the array defines a column: + The required schema definition. The schema must be a JSON array, where each entry represents a column specification. + More information on the schema specification requirements can be found in the dedicated section of the document: - .. code-block:: - - { "name" : , - "type" : - } - - Where: - - ``name`` - The name of a column - ``type`` - A valid MySQL type - - **Note**: The service preserves the column order when creating a table. + - :ref:`http-frontend-ingest-schema-spec` ``indexes`` : *array* = ``[]`` - The optional indexes will be created after ingesting the table. - + The optional indexes will be created after ingesting the table. See the example below for a scenario when indexes are needed. More information on the index specification requirements can be found in the dedicated section of the document: - :ref:`http-frontend-ingest-indexes`. + + - :ref:`http-frontend-ingest-index-spec` ``rows`` : *array* The required collection of the data rows to be ingested. Each element of the array represents a complete row, @@ -144,7 +169,7 @@ Here is an example of the simple table creation specification: .. code-block:: json - { "version" : 38, + { "version" : 39, "database" : "user_gapon", "table" : "employee", "schema" : [ @@ -164,7 +189,7 @@ The description could be pushed to the service using: curl -k 'https://localhost:4041/ingest/data' -X POST \ -H 'Content-Type: application/json' \ - -d'{"version":38,"database":"user_gapon",..}' + -d'{"version":39,"database":"user_gapon",..}' If the request succeeds then the following table will be created: @@ -179,51 +204,12 @@ If the request succeeds then the following table will be created: ) ENGINE=MyISAM DEFAULT CHARSET=latin1; -.. _http-frontend-ingest-indexes: - -Creating indexes -^^^^^^^^^^^^^^^^ - -.. note:: - - For detailed information on the schema of the index specifications, please refer to the following document: - - - :ref:`admin-data-table-index` (ADMIN) - -Indexes, if needed, must be specified in the ``indexes`` attribute of the table creation request. This attribute is a JSON array, -where each entry represents an index specification: - -.. code-block:: - - { "version" : 38, - "indexes" : [ - { "index" : , - "spec" : , - "comment" : , - "columns" : [ - { "column" : , "length" : , "ascending" : }, - ... - ] - }, - ... - ], - ... - } - -A few notes: - -- It is possible to create one or many indexes in such specifications. -- Index names (attribute ``index``) must be unique for the table. -- An index may involve one or many columns as specified in the array ```columns```. -- Index comment (attribute ``comment``) is optional. -- Other attributes are mandatory. - Here is an example of the earlier presented simple table creation specification which also includes an index specification: .. code-block:: json - { "version" : 38, + { "version" : 39, "database" : "user_gapon", "table" : "employee", "schema" : [ @@ -261,6 +247,253 @@ This specification will result in creating the following table: ) ENGINE=MyISAM DEFAULT CHARSET=latin1; +.. _http-frontend-ingest-multipart: + +multipart/form-data +^^^^^^^^^^^^^^^^^^^ + +.. warning:: + + - The order of parts in the request body is important. The service expects the table payload to be sent last. + Otherwise, the service will fail to process the request. + - The ``multipart/form-data`` header is not required when using ``curl`` to send the request. The service will + automatically recognize the format of the request body. + +The following REST service implements the synchronous interface for ingesting a table into Qserv: + +.. list-table:: + :widths: 10 90 + :header-rows: 0 + + * - ``POST`` + - ``/ingest/csv`` + +The service requires a ``multipart/form-data``-formatted request body. The body must include the following parts +and files: + +``database`` : *part* + The required name of a user database. + +``table`` : *part* + The required name of a table. + +``fields_terminated_by`` : *part* = ``\t`` + The optional parameter of the desired CSV dialect: a character that separates fields in a row. + The dafault value assumes the tab character. + +``fields_enclosed_by`` : *part* = ``""`` + The optional parameter of the desired CSV dialect: a character that encloses fields in a row. + The default value assumes no quotes around fields. + +``fields_escaped_by`` : *part* = ``\\`` + The optional parameter of the desired CSV dialect: a character that escapes special characters in a field. + The default value assumes two backslash characters. + +``lines_terminated_by`` : *part* = ``\n`` + The optional parameter of the desired CSV dialect: a character that separates rows. + The default value assumes the newline character. + +``charset_name`` : *part* = ``latin1`` + The optional parameters specify the desired character set name to be assumed when ingesting + the contribution. The default value may be also affected by the ingest services configuration. + See the following document for more details: + + - :ref:`ingest-api-advanced-charset` (ADVANCED) + +``timeout`` : *part* = ``300`` + The optional timeout (in seconds) that limits the duration of the internal operations initiated by the service. + In practical terms, this means that the total wait time for the completion of a request will not exceed the specified timeout. + + **Note**: The number specified as a value of the attribute can not be ``0``. + +``schema`` : *file* + The required schema definition. More information on the schema specification requirements can be found in the dedicated + section of the document: + + - :ref:`http-frontend-ingest-schema-spec` + +``indexes`` : *file* = ``[]`` + The optional indexes will be created after ingesting the table. The indexes must be a JSON file that follows + the index specification as described in the following section: + + - :ref:`http-frontend-ingest-index-spec` + +``rows`` : *file* + The required CSV file containing the data to be ingested. + +A call to this service will block the client application for the time required to create +a database (if it does not already exist), create a table, process and ingest the data, and perform +additional steps (such as creating indexes). The request will fail if it exceeds the specified (or implied) timeout. + +Here is an example of the simple table creation specification, which also includes an index specification. The table schema +is sent as a JSON file ``schema.json`` presented below: + +.. code-block:: json + + [ { "name" : "id", "type" : "INT" }, + { "name" : "val", "type" : "VARCHAR(32)" }, + { "name" : "active", "type" : "BOOL" } + ] + +The index specification is sent as a JSON file ``indexes.json`` presented below: + +.. code-block:: json + + [ { "index" : "idx_id", + "spec" : "UNIQUE", + "comment" : "This is the primary key index", + "columns" : [ + { "column" : "id", "length" : 0, "ascending" : 1 } + ] + } + ] + +And the CSV file ``employee.csv`` containing the data to be ingested: + +.. code-block:: + + 123,Igor Gaponenko,1 + 2,John Smith,0 + +The request could be pushed to the service using: + +.. code-block:: bash + + curl -k 'https://localhost:4041/ingest/csv' \ + -F 'database=user_gapon' \ + -F 'table=employee' \ + -F 'fields_terminated_by=,' \ + -F 'timeout=300' \ + -F 'schema=@/path/to/schema.json' \ + -F 'indexes=@/path/to/indexes.json' \ + -F 'rows=@/path/to/employee.csv' + +**Note**: The ``-k`` option is used to ignore SSL certificate verification. + +Here is the complete Python code that does the same: + +.. code-block:: python + + import requests + from requests_toolbelt.multipart.encoder import MultipartEncoder + import urllib3 + + # Supress the warning about the self-signed certificate + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + + database = "user_gapon" + table = "employee" + url = "https://localhost:4041/ingest/csv" + encoder = MultipartEncoder( + fields = { + "version": (None, "39"), + "database" : (None, database), + "table": (None, table), + "fields_terminated_by": (None, ","), + "timeout": (None, "300"), + "schema": ("schema.json", open("/path/to/schema.json", "rb"), "application/json"), + "indexes": ("indexes.json", open("/path/to/indexes.json", "rb"), "application/json"), + "rows": ("employee.csv", open("/path/to/employee.csv", "rb"), "text/csv"), + } + ) + req = requests.post(url, data=encoder, + headers={"Content-Type": encoder.content_type}, + verify=False) + req.raise_for_status() + res = req.json() + if res["success"] == 0: + error = res["error"] + raise RuntimeError(f"Failed to create and load the table: {table} in user database: {database}, error: {error}") + +**Notes**: + +- The parameter ``verify=False`` is used to ignore SSL certificate verification. Note using ``urllib3`` to suppress + the certificate-related warning. Do not use this in production code. +- The class ``MultipartEncoder`` is required for streaming large files w/o loading them into memory. + +.. _http-frontend-ingest-schema-spec: + +Schema specification +^^^^^^^^^^^^^^^^^^^^ + +.. note:: + + The service preserves the column order when creating a table. + +The table schema must be specified in the ``schema`` attribute of the table creation request. This attribute is a JSON array, +where each element of the array defines a column: + +.. code-block:: + + [ { "name" : , "type" : }, + ... + ] + +Where: + +``name`` + The name of a column + +``type`` + A valid MySQL type + +For example: + +.. code-block:: json + + [ { "name" : "id", "type" : "INT" }, + { "name" : "val", "type" : "VARCHAR(32)" }, + { "name" : "active", "type" : "BOOL" } + ] + +.. _http-frontend-ingest-index-spec: + +Index specification +^^^^^^^^^^^^^^^^^^^ + +.. note:: + + For detailed information on the schema of the index specifications, please refer to the following document: + + - :ref:`admin-data-table-index` (ADMIN) + +Indexes, if needed, must be specified in the ``indexes`` attribute of the table creation request. This attribute is a JSON array, +where each entry represents an index specification. + +.. code-block:: + + [ { "index" : , + "spec" : , + "comment" : , + "columns" : [ + { "column" : , "length" : , "ascending" : }, + ... + ] + }, + ... + ] + +A few notes: + +- It is possible to create one or many indexes in such specifications. +- Index names (attribute ``index``) must be unique for the table. +- An index may involve one or many columns as specified in the array ```columns```. +- Index comment (attribute ``comment``) is optional. +- Other attributes are mandatory. + +For example: + +.. code-block:: json + + [ { "index" : "idx_id", + "spec" : "UNIQUE", + "comment" : "This is the primary key index", + "columns" : [ + { "column" : "id", "length" : 0, "ascending" : 1 } + ] + } + ] + Deleting tables --------------- @@ -278,12 +511,6 @@ Where: ``database`` : *string* The required name of the user database containing the table to be deleted. - **Note**: Database names must start with the following prefix: - - .. code-block:: - - user__ - ``table`` : *string* The required name of a table to be deleted. @@ -293,7 +520,7 @@ For example: curl -k 'https://localhost:4041/ingest/table/user_gapon/employees' -X DELETE \ -H 'Content-Type: application/json' \ - -d'{"version":38}' + -d'{"version":39}' A few notes: @@ -320,19 +547,13 @@ Where: ``database`` : *string* The required name of a database to be deleted. - **Note**: Database names must start with the following prefix: - - .. code-block:: - - user__ - For example: .. code-block:: bash curl -k 'https://localhost:4041/ingest/database/user_gapon' -X DELETE \ -H 'Content-Type: application/json' \ - -d'{"version":38}' + -d'{"version":39}' A few notes: @@ -356,5 +577,3 @@ Potential enhancements for the table ingest service include: - Introducing a service for asynchronous table ingests. - Implementing a service to track the status and progress of asynchronous requests. - Providing a service to cancel queued asynchronous requests. -- Supporting table ingests using the ``multipart/form-data`` format, where data is sent as - a ``CSV``-formatted file. diff --git a/doc/user/http-frontend-query.rst b/doc/user/http-frontend-query.rst index 87230e208..ea76a86d2 100644 --- a/doc/user/http-frontend-query.rst +++ b/doc/user/http-frontend-query.rst @@ -129,7 +129,7 @@ For example, consider the following request: .. code-block:: bash - curl -k 'https://localhost:4041/query-async/status/123?version=38' -X GET + curl -k 'https://localhost:4041/query-async/status/123?version=39' -X GET It might result in the following response: @@ -354,7 +354,7 @@ For example, consider the following query submission request: .. code-block:: bash curl -k 'https://localhost:4041/query' -X POST-H 'Content-Type: application/json' \ - -d'{"version":38,"query":"SELECT objectId,coord_ra,coord_dec FROM dp02_dc2_catalogs.Object LIMIT 5"}' + -d'{"version":39,"query":"SELECT objectId,coord_ra,coord_dec FROM dp02_dc2_catalogs.Object LIMIT 5"}' The query could return: @@ -402,7 +402,7 @@ If the query identifier is not valid, the service will report an error in the re .. code-block:: bash - curl -k 'https://localhost:4041/query-async/123?version=38' -X DELETE + curl -k 'https://localhost:4041/query-async/123?version=39' -X DELETE It might result in the following response: diff --git a/doc/user/http-frontend.rst b/doc/user/http-frontend.rst index 5de379bba..0891f4943 100644 --- a/doc/user/http-frontend.rst +++ b/doc/user/http-frontend.rst @@ -1,10 +1,10 @@ .. note:: - - This guide corresponds to version **38** of the Qserv REST API. Note that each API implementation has a specific version. + - This guide corresponds to version **39** of the Qserv REST API. Note that each API implementation has a specific version. The version number will change if any modifications to the implementation or API that might affect users are made. This document will be updated to reflect the latest API version. - - As of version **38**, all communications with the service are over SSL/TLS encrypted connections. + - All communications with the service are over SSL/TLS encrypted connections. The service will not accept unencrypted connections. Use the ``-k`` option with ``curl`` to bypass SSL certificate verification if necessary.