Skip to content

Commit

Permalink
Merge branch 'master' into function-1
Browse files Browse the repository at this point in the history
  • Loading branch information
liugddx authored Feb 2, 2024
2 parents 19183ed + 45c451e commit 48196be
Show file tree
Hide file tree
Showing 573 changed files with 12,880 additions and 5,020 deletions.
1 change: 0 additions & 1 deletion .github/actions/create-or-update-comment
Submodule create-or-update-comment deleted from 23ff15
40 changes: 0 additions & 40 deletions .github/workflows/auto-pr-reply.yml

This file was deleted.

14 changes: 14 additions & 0 deletions .github/workflows/comment-to-trigger-teamcity.yml
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,13 @@ jobs:
"${{ steps.parse.outputs.COMMIT_ID_FROM_TRIGGER }}" \
"performance" \
"${{ steps.parse.outputs.COMMENT_REPEAT_TIMES }}"
trigger_or_skip_build \
"${{ steps.changes.outputs.changed_performance }}" \
"${{ steps.parse.outputs.PULL_REQUEST_NUM }}" \
"${{ steps.parse.outputs.COMMIT_ID_FROM_TRIGGER }}" \
"perf" \
"${{ steps.parse.outputs.COMMENT_REPEAT_TIMES }}"
else
echo "PR target branch not in (master, branch-2.0), skip run performance"
trigger_or_skip_build \
Expand All @@ -277,4 +284,11 @@ jobs:
"${{ steps.parse.outputs.COMMIT_ID_FROM_TRIGGER }}" \
"performance" \
"${{ steps.parse.outputs.COMMENT_REPEAT_TIMES }}"
trigger_or_skip_build \
"false" \
"${{ steps.parse.outputs.PULL_REQUEST_NUM }}" \
"${{ steps.parse.outputs.COMMIT_ID_FROM_TRIGGER }}" \
"perf" \
"${{ steps.parse.outputs.COMMENT_REPEAT_TIMES }}"
fi
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ thirdparty/installed*
thirdparty/doris-thirdparty*.tar.xz

docker/thirdparties/docker-compose/mysql/data
docker/thirdparties/docker-compose/hive/scripts/tpch1.db/
docker/thirdparties/docker-compose/hive/scripts/paimon1

fe_plugins/output
fe_plugins/**/.factorypath
Expand Down
3 changes: 0 additions & 3 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,3 @@
path = be/src/clucene
url = https://github.com/apache/doris-thirdparty.git
branch = clucene
[submodule ".github/actions/create-or-update-comment"]
path = .github/actions/create-or-update-comment
url = https://github.com/peter-evans/create-or-update-comment.git
276 changes: 97 additions & 179 deletions be/src/agent/agent_server.cpp

Large diffs are not rendered by default.

52 changes: 13 additions & 39 deletions be/src/agent/agent_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@

#include <memory>
#include <string>
#include <unordered_map>
#include <vector>

namespace doris {

class TaskWorkerPoolIf;
class TaskWorkerPool;
class PriorTaskWorkerPool;
class ReportWorker;
Expand All @@ -36,22 +38,21 @@ class TAgentTaskRequest;
class TMasterInfo;
class TSnapshotRequest;
class StorageEngine;
class CloudStorageEngine;

// Each method corresponds to one RPC from FE Master, see BackendService.
class AgentServer {
public:
explicit AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info);
~AgentServer();

void start_workers(StorageEngine& engine, ExecEnv* exec_env);

void cloud_start_workers(CloudStorageEngine& engine, ExecEnv* exec_env);

// Receive agent task from FE master
void submit_tasks(TAgentResult& agent_result, const std::vector<TAgentTaskRequest>& tasks);

// TODO(lingbin): make the agent_result to be a pointer, because it will be modified.
static void make_snapshot(StorageEngine& engine, TAgentResult& agent_result,
const TSnapshotRequest& snapshot_request);
static void release_snapshot(StorageEngine& engine, TAgentResult& agent_result,
const std::string& snapshot_path);

// Deprecated
// TODO(lingbin): This method is deprecated, should be removed later.
// [[deprecated]]
Expand All @@ -60,43 +61,16 @@ class AgentServer {
TopicSubscriber* get_topic_subscriber() { return _topic_subscriber.get(); }

private:
void start_workers(ExecEnv* exec_env);

// Reference to the ExecEnv::_master_info
const TMasterInfo& _master_info;

std::unique_ptr<TaskWorkerPool> _create_tablet_workers;
std::unique_ptr<TaskWorkerPool> _drop_tablet_workers;
std::unique_ptr<PriorTaskWorkerPool> _push_load_workers;
std::unique_ptr<TaskWorkerPool> _publish_version_workers;
std::unique_ptr<TaskWorkerPool> _clear_transaction_task_workers;
std::unique_ptr<TaskWorkerPool> _push_delete_workers;
std::unique_ptr<TaskWorkerPool> _alter_tablet_workers;
std::unique_ptr<TaskWorkerPool> _alter_inverted_index_workers;
std::unique_ptr<TaskWorkerPool> _push_cooldown_conf_workers;
std::unique_ptr<TaskWorkerPool> _clone_workers;
std::unique_ptr<TaskWorkerPool> _storage_medium_migrate_workers;
std::unique_ptr<TaskWorkerPool> _check_consistency_workers;

// These 3 worker-pool do not accept tasks from FE.
// It is self triggered periodically and reports to Fe master
std::unique_ptr<ReportWorker> _report_task_workers;
std::unique_ptr<ReportWorker> _report_disk_state_workers;
std::unique_ptr<ReportWorker> _report_tablet_workers;

std::unique_ptr<TaskWorkerPool> _upload_workers;
std::unique_ptr<TaskWorkerPool> _download_workers;
std::unique_ptr<TaskWorkerPool> _make_snapshot_workers;
std::unique_ptr<TaskWorkerPool> _release_snapshot_workers;
std::unique_ptr<TaskWorkerPool> _move_dir_workers;
std::unique_ptr<TaskWorkerPool> _recover_tablet_workers;
std::unique_ptr<TaskWorkerPool> _update_tablet_meta_info_workers;

std::unique_ptr<TaskWorkerPool> _submit_table_compaction_workers;

std::unique_ptr<TaskWorkerPool> _push_storage_policy_workers;
std::unordered_map<int64_t /* TTaskType */, std::unique_ptr<TaskWorkerPoolIf>> _workers;

// These workers do not accept tasks from FE.
// It is self triggered periodically and reports to FE master
std::vector<std::unique_ptr<ReportWorker>> _report_workers;

std::unique_ptr<TopicSubscriber> _topic_subscriber;
std::unique_ptr<TaskWorkerPool> _gc_binlog_workers;
};

} // end namespace doris
2 changes: 1 addition & 1 deletion be/src/agent/be_exec_version_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class BeExecVersionManager {
* c. cleared old version of Version 2.
* d. unix_timestamp function support timestamp with float for datetimev2, and change nullable mode.
* e. change shuffle serialize/deserialize way
* f. the right function outputs NULL when the function contains NULL, substr function returns empty if start > str.length.
* f. the right function outputs NULL when the function contains NULL, substr function returns empty if start > str.length, and change some function nullable mode.
*/
constexpr inline int BeExecVersionManager::max_be_exec_version = 3;
constexpr inline int BeExecVersionManager::min_be_exec_version = 0;
Expand Down
20 changes: 12 additions & 8 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,8 @@ bool handle_report(const TReportRequest& request, const TMasterInfo& master_info
return true;
}

void _submit_task(const TAgentTaskRequest& task,
std::function<Status(const TAgentTaskRequest&)> submit_op) {
Status _submit_task(const TAgentTaskRequest& task,
std::function<Status(const TAgentTaskRequest&)> submit_op) {
const TTaskType::type task_type = task.task_type;
int64_t signature = task.signature;

Expand All @@ -314,18 +314,22 @@ void _submit_task(const TAgentTaskRequest& task,

if (!register_task_info(task_type, signature)) {
LOG_WARNING("failed to register task").tag("type", type_str).tag("signature", signature);
return;
// Duplicated task request, just return OK
return Status::OK();
}

// TODO(plat1ko): check task request member

// Set the receiving time of task so that we can determine whether it is timed out later
(const_cast<TAgentTaskRequest&>(task)).__set_recv_time(time(nullptr));
auto st = submit_op(task);
if (!st.ok()) [[unlikely]] {
LOG_INFO("failed to submit task").tag("type", type_str).tag("signature", signature);
return;
return st;
}

LOG_INFO("successfully submit task").tag("type", type_str).tag("signature", signature);
return Status::OK();
}

bvar::LatencyRecorder g_publish_version_latency("doris_pk", "publish_version");
Expand Down Expand Up @@ -426,8 +430,8 @@ void TaskWorkerPool::stop() {
}
}

void TaskWorkerPool::submit_task(const TAgentTaskRequest& task) {
_submit_task(task, [this](auto&& task) {
Status TaskWorkerPool::submit_task(const TAgentTaskRequest& task) {
return _submit_task(task, [this](auto&& task) {
add_task_count(task, 1);
return _thread_pool->submit_func([this, task]() {
_callback(task);
Expand Down Expand Up @@ -484,8 +488,8 @@ void PriorTaskWorkerPool::stop() {
}
}

void PriorTaskWorkerPool::submit_task(const TAgentTaskRequest& task) {
_submit_task(task, [this](auto&& task) {
Status PriorTaskWorkerPool::submit_task(const TAgentTaskRequest& task) {
return _submit_task(task, [this](auto&& task) {
auto req = std::make_unique<TAgentTaskRequest>(task);
add_task_count(*req, 1);
if (req->__isset.priority && req->priority == TPriority::HIGH) {
Expand Down
20 changes: 14 additions & 6 deletions be/src/agent/task_worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <string>
#include <string_view>

#include "common/status.h"
#include "gutil/ref_counted.h"

namespace doris {
Expand All @@ -39,16 +40,23 @@ class TReportRequest;
class TTabletInfo;
class TAgentTaskRequest;

class TaskWorkerPool {
class TaskWorkerPoolIf {
public:
virtual ~TaskWorkerPoolIf() = default;

virtual Status submit_task(const TAgentTaskRequest& task) = 0;
};

class TaskWorkerPool : public TaskWorkerPoolIf {
public:
TaskWorkerPool(std::string_view name, int worker_count,
std::function<void(const TAgentTaskRequest&)> callback);

virtual ~TaskWorkerPool();
~TaskWorkerPool() override;

void stop();

void submit_task(const TAgentTaskRequest& task);
Status submit_task(const TAgentTaskRequest& task) override;

protected:
std::atomic_bool _stopped {false};
Expand All @@ -68,16 +76,16 @@ class PublishVersionWorkerPool final : public TaskWorkerPool {
StorageEngine& _engine;
};

class PriorTaskWorkerPool {
class PriorTaskWorkerPool final : public TaskWorkerPoolIf {
public:
PriorTaskWorkerPool(std::string_view name, int normal_worker_count, int high_prior_worker_conut,
std::function<void(const TAgentTaskRequest& task)> callback);

~PriorTaskWorkerPool();
~PriorTaskWorkerPool() override;

void stop();

void submit_task(const TAgentTaskRequest& task);
Status submit_task(const TAgentTaskRequest& task) override;

private:
void normal_loop();
Expand Down
48 changes: 48 additions & 0 deletions be/src/cloud/cloud_backend_service.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "cloud/cloud_backend_service.h"

#include "common/config.h"
#include "util/thrift_server.h"

namespace doris {

CloudBackendService::CloudBackendService(CloudStorageEngine& engine, ExecEnv* exec_env)
: BaseBackendService(exec_env), _engine(engine) {}

CloudBackendService::~CloudBackendService() = default;

Status CloudBackendService::create_service(CloudStorageEngine& engine, ExecEnv* exec_env, int port,
std::unique_ptr<ThriftServer>* server) {
auto service = std::make_shared<CloudBackendService>(engine, exec_env);
service->_agent_server->cloud_start_workers(engine, exec_env);
// TODO: do we want a BoostThreadFactory?
// TODO: we want separate thread factories here, so that fe requests can't starve
// be requests
// std::shared_ptr<TProcessor> be_processor = std::make_shared<BackendServiceProcessor>(service);
auto be_processor = std::make_shared<BackendServiceProcessor>(service);

*server = std::make_unique<ThriftServer>("backend", be_processor, port,
config::be_service_threads);

LOG(INFO) << "Doris CloudBackendService listening on " << port;

return Status::OK();
}

} // namespace doris
41 changes: 41 additions & 0 deletions be/src/cloud/cloud_backend_service.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "service/backend_service.h"

namespace doris {

class CloudStorageEngine;

class CloudBackendService final : public BaseBackendService {
public:
static Status create_service(CloudStorageEngine& engine, ExecEnv* exec_env, int port,
std::unique_ptr<ThriftServer>* server);

CloudBackendService(CloudStorageEngine& engine, ExecEnv* exec_env);

~CloudBackendService() override;

// TODO(plat1ko): cloud backend functions

private:
[[maybe_unused]] CloudStorageEngine& _engine;
};

} // namespace doris
Loading

0 comments on commit 48196be

Please sign in to comment.