Skip to content

Commit

Permalink
Add injection point for ms
Browse files Browse the repository at this point in the history
  • Loading branch information
SWJTU-ZhangLei committed Sep 27, 2024
1 parent e4a1d3d commit 359f516
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 0 deletions.
1 change: 1 addition & 0 deletions cloud/src/meta-service/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ add_library(MetaService
meta_server.cpp
meta_service.cpp
meta_service_http.cpp
injection_point_http.cpp
meta_service_job.cpp
meta_service_resource.cpp
meta_service_schema.cpp
Expand Down
159 changes: 159 additions & 0 deletions cloud/src/meta-service/injection_point_http.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <brpc/controller.h>
#include <brpc/http_status_code.h>
#include <brpc/uri.h>
#include <fmt/format.h>
#include <gen_cpp/cloud.pb.h>

#include "common/config.h"
#include "common/logging.h"
#include "cpp/sync_point.h"
#include "meta-service/keys.h"
#include "meta-service/txn_kv.h"
#include "meta-service/txn_kv_error.h"
#include "meta_service.h"
#include "meta_service_http.h"

namespace doris::cloud {

HttpResponse set_sleep(const std::string& point, const brpc::URI& uri) {
std::string duration_str(http_query(uri, "duration"));
int64_t duration = 0;
try {
duration = std::stol(duration_str);
} catch (const std::exception& e) {
auto msg = fmt::format("invalid duration:{}", duration_str);
LOG(WARNING) << msg;
return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg);
}

auto sp = SyncPoint::get_instance();
sp->set_call_back(point, [point, duration](auto&& args) {
LOG(INFO) << "injection point hit, point=" << point << " sleep milliseconds=" << duration;
std::this_thread::sleep_for(std::chrono::milliseconds(duration));
});
return http_json_reply(MetaServiceCode::OK, "OK");
}

HttpResponse set_return(const std::string& point, const brpc::URI& uri) {
auto sp = SyncPoint::get_instance();
sp->set_call_back(point, [point](auto&& args) {
try {
LOG(INFO) << "injection point hit, point=" << point << " return void";
auto pred = try_any_cast<bool*>(args.back());
*pred = true;
} catch (const std::bad_any_cast& e) {
LOG_EVERY_N(ERROR, 10) << "failed to process `return` e:" << e.what();
}
});

return http_json_reply(MetaServiceCode::OK, "OK");
}

HttpResponse set_return_ok(const std::string& point, const brpc::URI& uri) {
auto sp = SyncPoint::get_instance();
sp->set_call_back(point, [point](auto&& args) {
try {
LOG(INFO) << "injection point hit, point=" << point << " return ok";
auto* pair = try_any_cast_ret<MetaServiceCode>(args);
pair->first = MetaServiceCode::OK;
pair->second = true;
} catch (const std::bad_any_cast& e) {
LOG_EVERY_N(ERROR, 10) << "failed to process `return_ok` e:" << e.what();
}
});
return http_json_reply(MetaServiceCode::OK, "OK");
}

HttpResponse set_return_error(const std::string& point, const brpc::URI& uri) {
//todo:
return http_json_reply(MetaServiceCode::OK, "OK");
}

HttpResponse handle_set(const brpc::URI& uri) {
const std::string point(http_query(uri, "name"));
if (point.empty()) {
return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "empty point name");
}

const std::string behavior(http_query(uri, "behavior"));
if (behavior.empty()) {
return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "empty behavior");
}
if (behavior == "sleep") {
return set_sleep(point, uri);
} else if (behavior == "return") {
return set_return(point, uri);
} else if (behavior == "return_ok") {
return set_return_ok(point, uri);
} else if (behavior == "return_error") {
return set_return_error(point, uri);
}

return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "unknown behavior: " + behavior);
}

HttpResponse handle_clear(const brpc::URI& uri) {
const std::string point(http_query(uri, "name"));
auto* sp = SyncPoint::get_instance();
LOG(INFO) << "clear injection point : " << (point.empty() ? "(all points)" : point);
if (point.empty()) {
// If point name is emtpy, clear all
sp->clear_all_call_backs();
return http_json_reply(MetaServiceCode::OK, "OK");
}
sp->clear_call_back(point);
return http_json_reply(MetaServiceCode::OK, "OK");
}

HttpResponse handle_apply_suite(const brpc::URI& uri) {
return http_json_reply(MetaServiceCode::OK, "OK");
}

HttpResponse handle_enable(const brpc::URI& uri) {
SyncPoint::get_instance()->enable_processing();
return http_json_reply(MetaServiceCode::OK, "OK");
}

HttpResponse handle_disable(const brpc::URI& uri) {
SyncPoint::get_instance()->disable_processing();
return http_json_reply(MetaServiceCode::OK, "OK");
}

HttpResponse process_injection_point(MetaServiceImpl* service, brpc::Controller* ctrl) {
auto& uri = ctrl->http_request().uri();
LOG(INFO) << "handle InjectionPointAction uri:" << uri;
const std::string op(http_query(uri, "op"));

if (op == "set") {
return handle_set(uri);
} else if (op == "clear") {
return handle_clear(uri);
} else if (op == "apply_suite") {
return handle_apply_suite(uri);
} else if (op == "enable") {
return handle_enable(uri);
} else if (op == "disable") {
return handle_disable(uri);
}

return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "unknown op:" + op);
}
} // namespace doris::cloud
4 changes: 4 additions & 0 deletions cloud/src/meta-service/meta_service_http.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ static std::string_view remove_version_prefix(std::string_view path) {
return path;
}

HttpResponse process_injection_point(MetaServiceImpl* service, brpc::Controller* ctrl);

static HttpResponse process_alter_cluster(MetaServiceImpl* service, brpc::Controller* ctrl) {
static std::unordered_map<std::string_view, AlterClusterRequest::Operation> operations {
{"add_cluster", AlterClusterRequest::ADD_CLUSTER},
Expand Down Expand Up @@ -545,10 +547,12 @@ void MetaServiceImpl::http(::google::protobuf::RpcController* controller,
{"encode_key", process_encode_key},
{"get_value", process_get_value},
{"show_meta_ranges", process_show_meta_ranges},
{"injection_point", process_injection_point},
{"v1/decode_key", process_decode_key},
{"v1/encode_key", process_encode_key},
{"v1/get_value", process_get_value},
{"v1/show_meta_ranges", process_show_meta_ranges},
{"v1/injection_point", process_injection_point},
// for get
{"get_instance", process_get_instance_info},
{"get_obj_store_info", process_get_obj_store_info},
Expand Down

0 comments on commit 359f516

Please sign in to comment.