Skip to content

Commit

Permalink
Migrated HTTP client classes and tests into the dedicated module
Browse files Browse the repository at this point in the history
  • Loading branch information
iagaponenko committed Nov 3, 2023
1 parent 4f148d7 commit 4049df5
Show file tree
Hide file tree
Showing 23 changed files with 417 additions and 383 deletions.
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ add_subdirectory(ccontrol)
add_subdirectory(css)
add_subdirectory(czar)
add_subdirectory(global)
add_subdirectory(http)
add_subdirectory(memman)
add_subdirectory(mimic)
add_subdirectory(mysql)
Expand Down
136 changes: 68 additions & 68 deletions src/replica/HttpAsyncReq.cc → src/http/AsyncReq.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
*/

// Class header
#include "replica/HttpAsyncReq.h"
#include "http/AsyncReq.h"

// Standard headers
#include <algorithm>
Expand All @@ -31,7 +31,7 @@
#include "boost/date_time/posix_time/posix_time.hpp"

// Qserv headers
#include "replica/HttpExceptions.h"
#include "http/Exceptions.h"

// LSST headers
#include "lsst/log/Log.h"
Expand All @@ -42,7 +42,7 @@ namespace http = boost::beast::http;

namespace {

LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.HttpAsyncReq");
LOG_LOGGER _log = LOG_GET("lsst.qserv.http.AsyncReq");

http::verb method2verb(string const& method) {
if (method == "GET")
Expand All @@ -53,13 +53,13 @@ http::verb method2verb(string const& method) {
return http::verb::put;
else if (method == "DELETE")
return http::verb::delete_;
throw invalid_argument("HttpAsyncReq::" + string(__func__) + " invalid method '" + method + "'.");
throw invalid_argument("AsyncReq::" + string(__func__) + " invalid method '" + method + "'.");
}
} // namespace

namespace lsst::qserv::replica {
namespace lsst::qserv::http {

string HttpAsyncReq::state2str(State state) {
string AsyncReq::state2str(State state) {
switch (state) {
case State::CREATED:
return "CREATED";
Expand All @@ -76,22 +76,22 @@ string HttpAsyncReq::state2str(State state) {
case State::EXPIRED:
return "EXPIRED";
}
throw invalid_argument("HttpAsyncReq::" + string(__func__) +
throw invalid_argument("AsyncReq::" + string(__func__) +
" unknown state: " + to_string(static_cast<int>(state)) + ".");
}

shared_ptr<HttpAsyncReq> HttpAsyncReq::create(asio::io_service& io_service, CallbackType const& onFinish,
string const& method, string const& url, string const& data,
std::unordered_map<std::string, std::string> const& headers,
size_t maxResponseBodySize, unsigned int expirationIvalSec) {
return shared_ptr<HttpAsyncReq>(new HttpAsyncReq(io_service, onFinish, method, url, data, headers,
maxResponseBodySize, expirationIvalSec));
shared_ptr<AsyncReq> AsyncReq::create(asio::io_service& io_service, CallbackType const& onFinish,
string const& method, string const& url, string const& data,
std::unordered_map<std::string, std::string> const& headers,
size_t maxResponseBodySize, unsigned int expirationIvalSec) {
return shared_ptr<AsyncReq>(new AsyncReq(io_service, onFinish, method, url, data, headers,
maxResponseBodySize, expirationIvalSec));
}

HttpAsyncReq::HttpAsyncReq(asio::io_service& io_service, CallbackType const& onFinish, string const& method,
string const& url, string const& data,
std::unordered_map<std::string, std::string> const& headers,
size_t maxResponseBodySize, unsigned int expirationIvalSec)
AsyncReq::AsyncReq(asio::io_service& io_service, CallbackType const& onFinish, string const& method,
string const& url, string const& data,
std::unordered_map<std::string, std::string> const& headers, size_t maxResponseBodySize,
unsigned int expirationIvalSec)
: _io_service(io_service),
_resolver(io_service),
_socket(io_service),
Expand All @@ -104,7 +104,7 @@ HttpAsyncReq::HttpAsyncReq(asio::io_service& io_service, CallbackType const& onF
_expirationIvalSec(expirationIvalSec),
_expirationTimer(io_service),
_timer(io_service) {
string const context = "HttpAsyncReq::" + string(__func__) + " ";
string const context = "AsyncReq::" + string(__func__) + " ";
if (_url.scheme() != Url::Scheme::HTTP) {
throw invalid_argument(context + "this implementation only supports urls based on the HTTP scheme.");
}
Expand All @@ -123,14 +123,14 @@ HttpAsyncReq::HttpAsyncReq(asio::io_service& io_service, CallbackType const& onF
}
}

HttpAsyncReq::~HttpAsyncReq() {
AsyncReq::~AsyncReq() {
_expirationTimer.cancel();
_timer.cancel();
boost::system::error_code ec;
_socket.shutdown(asio::ip::tcp::socket::shutdown_both, ec);
}

string HttpAsyncReq::version() const {
string AsyncReq::version() const {
switch (_req.version()) {
case 11:
return "HTTP/1.1";
Expand All @@ -139,9 +139,9 @@ string HttpAsyncReq::version() const {
}
}

void HttpAsyncReq::start() {
string const context = "HttpAsyncReq::" + string(__func__) + " ";
replica::Lock lock(_mtx, context);
void AsyncReq::start() {
string const context = "AsyncReq::" + string(__func__) + " ";
std::lock_guard<std::mutex> const lock(_mtx);
_assertState(lock, context, {State::CREATED});
try {
_resolve(lock);
Expand All @@ -157,9 +157,9 @@ void HttpAsyncReq::start() {
}
}

bool HttpAsyncReq::cancel() {
string const context = "HttpAsyncReq::" + string(__func__) + " ";
replica::Lock lock(_mtx, context);
bool AsyncReq::cancel() {
string const context = "AsyncReq::" + string(__func__) + " ";
std::lock_guard<std::mutex> const lock(_mtx);
switch (_state) {
case State::CREATED:
case State::IN_PROGRESS:
Expand All @@ -170,62 +170,62 @@ bool HttpAsyncReq::cancel() {
}
}

string HttpAsyncReq::errorMessage() const {
string const context = "HttpAsyncReq::" + string(__func__) + " ";
replica::Lock lock(_mtx, context);
string AsyncReq::errorMessage() const {
string const context = "AsyncReq::" + string(__func__) + " ";
std::lock_guard<std::mutex> const lock(_mtx);
return _error;
}

int HttpAsyncReq::responseCode() const {
string const context = "HttpAsyncReq::" + string(__func__) + " ";
replica::Lock lock(_mtx, context);
int AsyncReq::responseCode() const {
string const context = "AsyncReq::" + string(__func__) + " ";
std::lock_guard<std::mutex> const lock(_mtx);
_assertState(lock, context, {State::FINISHED, State::BODY_LIMIT_ERROR});
auto const& header = _res.get().base();
return header.result_int();
}

unordered_map<string, string> const& HttpAsyncReq::responseHeader() const {
string const context = "HttpAsyncReq::" + string(__func__) + " ";
replica::Lock lock(_mtx, context);
unordered_map<string, string> const& AsyncReq::responseHeader() const {
string const context = "AsyncReq::" + string(__func__) + " ";
std::lock_guard<std::mutex> const lock(_mtx);
_assertState(lock, context, {State::FINISHED, State::BODY_LIMIT_ERROR});
return _responseHeader;
}

string const& HttpAsyncReq::responseBody() const {
string const context = "HttpAsyncReq::" + string(__func__) + " ";
replica::Lock lock(_mtx, context);
string const& AsyncReq::responseBody() const {
string const context = "AsyncReq::" + string(__func__) + " ";
std::lock_guard<std::mutex> const lock(_mtx);
_assertState(lock, context, {State::FINISHED});
return _res.get().body();
}

size_t HttpAsyncReq::responseBodySize() const {
string const context = "HttpAsyncReq::" + string(__func__) + " ";
replica::Lock lock(_mtx, context);
size_t AsyncReq::responseBodySize() const {
string const context = "AsyncReq::" + string(__func__) + " ";
std::lock_guard<std::mutex> const lock(_mtx);
_assertState(lock, context, {State::FINISHED});
return _res.get().body().size();
}

void HttpAsyncReq::_restart(replica::Lock const& lock) {
void AsyncReq::_restart(std::lock_guard<std::mutex> const& lock) {
_timer.cancel();
_timer.expires_from_now(boost::posix_time::seconds(_timerIvalSec));
_timer.async_wait(
[self = shared_from_this()](boost::system::error_code const& ec) { self->_restarted(ec); });
}

void HttpAsyncReq::_restarted(boost::system::error_code const& ec) {
string const context = "HttpAsyncReq::" + string(__func__) + " ";
void AsyncReq::_restarted(boost::system::error_code const& ec) {
string const context = "AsyncReq::" + string(__func__) + " ";

// Ignore this event if the timer was aborted
if (ec == boost::asio::error::operation_aborted) return;

if (State::IN_PROGRESS != _state) return;
replica::Lock lock(_mtx, context);
std::lock_guard<std::mutex> const lock(_mtx);
if (State::IN_PROGRESS != _state) return;

_resolve(lock);
}

void HttpAsyncReq::_resolve(replica::Lock const& lock) {
void AsyncReq::_resolve(std::lock_guard<std::mutex> const& lock) {
_resolver.async_resolve(
_url.host(), to_string(_url.port() == 0 ? 80 : _url.port()),
[self = shared_from_this()](boost::system::error_code const& ec,
Expand All @@ -234,12 +234,12 @@ void HttpAsyncReq::_resolve(replica::Lock const& lock) {
});
}

void HttpAsyncReq::_resolved(boost::system::error_code const& ec,
asio::ip::tcp::resolver::results_type const& results) {
string const context = "HttpAsyncReq::" + string(__func__) + " ";
void AsyncReq::_resolved(boost::system::error_code const& ec,
asio::ip::tcp::resolver::results_type const& results) {
string const context = "AsyncReq::" + string(__func__) + " ";

if (State::IN_PROGRESS != _state) return;
replica::Lock lock(_mtx, context);
std::lock_guard<std::mutex> const lock(_mtx);
if (State::IN_PROGRESS != _state) return;

if (ec.value() != 0) {
Expand All @@ -253,11 +253,11 @@ void HttpAsyncReq::_resolved(boost::system::error_code const& ec,
const asio::ip::tcp::endpoint& endpoint) { self->_connected(ec); });
}

void HttpAsyncReq::_connected(boost::system::error_code const& ec) {
string const context = "HttpAsyncReq::" + string(__func__) + " ";
void AsyncReq::_connected(boost::system::error_code const& ec) {
string const context = "AsyncReq::" + string(__func__) + " ";

if (State::IN_PROGRESS != _state) return;
replica::Lock lock(_mtx, context);
std::lock_guard<std::mutex> const lock(_mtx);
if (State::IN_PROGRESS != _state) return;

if (ec.value() != 0) {
Expand All @@ -271,11 +271,11 @@ void HttpAsyncReq::_connected(boost::system::error_code const& ec) {
});
}

void HttpAsyncReq::_sent(boost::system::error_code const& ec, size_t bytesSent) {
string const context = "HttpAsyncReq::" + string(__func__) + " ";
void AsyncReq::_sent(boost::system::error_code const& ec, size_t bytesSent) {
string const context = "AsyncReq::" + string(__func__) + " ";

if (State::IN_PROGRESS != _state) return;
replica::Lock lock(_mtx, context);
std::lock_guard<std::mutex> const lock(_mtx);
if (State::IN_PROGRESS != _state) return;

if (ec.value() != 0) {
Expand All @@ -292,11 +292,11 @@ void HttpAsyncReq::_sent(boost::system::error_code const& ec, size_t bytesSent)
});
}

void HttpAsyncReq::_received(boost::system::error_code const& ec, size_t bytesReceived) {
string const context = "HttpAsyncReq::" + string(__func__) + " ";
void AsyncReq::_received(boost::system::error_code const& ec, size_t bytesReceived) {
string const context = "AsyncReq::" + string(__func__) + " ";

if (_state != State::IN_PROGRESS) return;
replica::Lock lock(_mtx, context);
std::lock_guard<std::mutex> const lock(_mtx);
if (_state != State::IN_PROGRESS) return;

State newState = State::FINISHED;
Expand All @@ -317,27 +317,27 @@ void HttpAsyncReq::_received(boost::system::error_code const& ec, size_t bytesRe
_finish(lock, newState);
}

void HttpAsyncReq::_extractCacheHeader(replica::Lock const& lock) {
void AsyncReq::_extractCacheHeader(std::lock_guard<std::mutex> const& lock) {
auto const& header = _res.get().base();
for (auto itr = header.cbegin(); itr != header.cend(); ++itr) {
_responseHeader.insert(pair<string, string>(itr->name_string(), itr->value()));
}
}

void HttpAsyncReq::_expired(boost::system::error_code const& ec) {
string const context = "HttpAsyncReq::" + string(__func__) + " ";
void AsyncReq::_expired(boost::system::error_code const& ec) {
string const context = "AsyncReq::" + string(__func__) + " ";

// Ignore this event if the timer was aborted
if (ec == boost::asio::error::operation_aborted) return;

if (_state != State::IN_PROGRESS) return;
replica::Lock lock(_mtx, context);
std::lock_guard<std::mutex> const lock(_mtx);
if (_state != State::IN_PROGRESS) return;

_finish(lock, State::EXPIRED);
}

void HttpAsyncReq::_finish(replica::Lock const& lock, State finalState, string const& error) {
void AsyncReq::_finish(std::lock_guard<std::mutex> const& lock, State finalState, string const& error) {
_state = finalState;
_error = error;

Expand All @@ -360,8 +360,8 @@ void HttpAsyncReq::_finish(replica::Lock const& lock, State finalState, string c
}
}

void HttpAsyncReq::_assertState(replica::Lock const& lock, string const& context,
initializer_list<State> const& desiredStates) const {
void AsyncReq::_assertState(std::lock_guard<std::mutex> const& lock, string const& context,
initializer_list<State> const& desiredStates) const {
if (find(desiredStates.begin(), desiredStates.end(), _state) == desiredStates.end()) {
string states;
for (auto&& state : desiredStates) {
Expand All @@ -373,11 +373,11 @@ void HttpAsyncReq::_assertState(replica::Lock const& lock, string const& context
}
}

void HttpAsyncReq::_logError(string const& prefix, boost::system::error_code const& ec) const {
void AsyncReq::_logError(string const& prefix, boost::system::error_code const& ec) const {
LOGS(_log, LOG_LVL_WARN,
prefix << " method: " << _method << " url: " << _url.url() << " host: " << _url.host()
<< " port: " << _url.port() << " target: " << _url.target() << " ec: " << ec.value() << " ["
<< ec.message() << "]");
}

} // namespace lsst::qserv::replica
} // namespace lsst::qserv::http
Loading

0 comments on commit 4049df5

Please sign in to comment.