From 4d76fd989e14c6bbf0415960304d0ed895d65cd3 Mon Sep 17 00:00:00 2001 From: xiehan <52160700+Barenboim@users.noreply.github.com> Date: Wed, 22 Nov 2023 17:16:20 +0800 Subject: [PATCH] Remove MapReduce (to be redesinged). (#1429) --- CMakeLists_Headers.txt | 2 - src/algorithm/MapReduce.h | 68 ------ src/algorithm/MapReduce.inl | 379 ----------------------------- src/factory/WFAlgoTaskFactory.h | 24 -- src/factory/WFAlgoTaskFactory.inl | 73 ------ src/include/workflow/MapReduce.h | 1 - src/include/workflow/MapReduce.inl | 1 - tutorial/CMakeLists.txt | 1 - tutorial/tutorial-20-reducer.cc | 90 ------- 9 files changed, 639 deletions(-) delete mode 100644 src/algorithm/MapReduce.h delete mode 100644 src/algorithm/MapReduce.inl delete mode 120000 src/include/workflow/MapReduce.h delete mode 120000 src/include/workflow/MapReduce.inl delete mode 100644 tutorial/tutorial-20-reducer.cc diff --git a/CMakeLists_Headers.txt b/CMakeLists_Headers.txt index 95fa6c781a..a3924d03c8 100644 --- a/CMakeLists_Headers.txt +++ b/CMakeLists_Headers.txt @@ -32,8 +32,6 @@ else () endif () set(INCLUDE_HEADERS - src/algorithm/MapReduce.h - src/algorithm/MapReduce.inl src/protocol/ProtocolMessage.h src/protocol/http_parser.h src/protocol/HttpMessage.h diff --git a/src/algorithm/MapReduce.h b/src/algorithm/MapReduce.h deleted file mode 100644 index 23184c3a53..0000000000 --- a/src/algorithm/MapReduce.h +++ /dev/null @@ -1,68 +0,0 @@ -/* - Copyright (c) 2020 Sogou, Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - - Author: Xie Han (xiehan@sogou-inc.com) -*/ - -#ifndef _MAPREDUCE_H_ -#define _MAPREDUCE_H_ - -#include -#include -#include -#include "rbtree.h" - -namespace algorithm -{ - -template -class ReduceIterator -{ -public: - virtual const VAL *next() = 0; - virtual size_t size() = 0; - -protected: - virtual ~ReduceIterator() { } -}; - -template -using reduce_function_t = - std::function *, VAL *)>; - -template -class Reducer -{ -public: - void insert(KEY&& key, VAL&& val); - -public: - void start(reduce_function_t reduce, - std::vector> *output); - -private: - struct rb_root key_tree; - -public: - Reducer() { this->key_tree.rb_node = NULL; } - virtual ~Reducer(); -}; - -} - -#include "MapReduce.inl" - -#endif - diff --git a/src/algorithm/MapReduce.inl b/src/algorithm/MapReduce.inl deleted file mode 100644 index 24ada091b3..0000000000 --- a/src/algorithm/MapReduce.inl +++ /dev/null @@ -1,379 +0,0 @@ -/* - Copyright (c) 2020 Sogou, Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - - Author: Xie Han (xiehan@sogou-inc.com) -*/ - -#include -#include -#include -#include -#include "rbtree.h" -#include "list.h" - -namespace algorithm -{ - -template -struct __ReduceValue -{ - struct list_head list; - VAL value; - __ReduceValue(VAL&& val) : value(std::move(val)) { } -}; - -template -struct __ReduceKey -{ - struct rb_node rb; - KEY key; - struct list_head value_list; - size_t value_cnt; - - __ReduceKey(KEY&& k) : key(std::move(k)) - { - INIT_LIST_HEAD(&this->value_list); - this->value_cnt = 0; - } - - void insert(VAL&& value) - { - __ReduceValue *entry = new __ReduceValue(std::move(value)); - list_add_tail(&entry->list, &this->value_list); - this->value_cnt++; - } - - ~__ReduceKey() - { - struct list_head *pos, *tmp; - list_for_each_safe(pos, tmp, &this->value_list) - delete list_entry(pos, struct __ReduceValue, list); - } -}; - -template::value> -class __ReduceIterator; - -#define __REDUCE_ITERATOR_HEAP_MAX 256 - -/* VAL is a class. VAL must have size() method. */ -template -class __ReduceIterator : public ReduceIterator -{ -public: - virtual const VAL *next(); - virtual size_t size() { return this->original_size; } - -private: - void reduce_begin() { this->original_size = this->heap_size; } - - void reduce_end(VAL&& value) - { - size_t n = this->original_size; - - assert(n != this->heap_size); - while (--n != this->heap_size) - delete this->heap[n]; - - this->heap[n]->value = std::move(value); - this->heap_insert(this->heap[n]); - } - - size_t count() { return this->heap_size; } - - __ReduceValue *value() { return this->heap[0]; } - -private: - void heapify(int top); - void heap_insert(__ReduceValue *data); - -private: - __ReduceValue *heap[__REDUCE_ITERATOR_HEAP_MAX]; - size_t heap_size; - size_t original_size; - -private: - __ReduceIterator(struct list_head *value_list, size_t *value_cnt); - template friend class Reducer; -}; - -template -const VAL *__ReduceIterator::next() -{ - __ReduceValue *data = this->heap[0]; - - if (this->heap_size == 0) - return NULL; - - this->heap[0] = this->heap[--this->heap_size]; - this->heapify(0); - this->heap[this->heap_size] = data; - return &data->value; -} - -template -void __ReduceIterator::heapify(int top) -{ - __ReduceValue *data = this->heap[top]; - __ReduceValue **child; - int last = this->heap_size - 1; - int i; - - while (i = 2 * top + 1, i < last) - { - child = &this->heap[i]; - if (child[0]->value.size() < data->value.size()) - { - if (child[1]->value.size() < child[0]->value.size()) - { - this->heap[top] = child[1]; - top = i + 1; - } - else - { - this->heap[top] = child[0]; - top = i; - } - } - else - { - if (child[1]->value.size() < data->value.size()) - { - this->heap[top] = child[1]; - top = i + 1; - } - else - { - this->heap[top] = data; - return; - } - } - } - - if (i == last) - { - child = &this->heap[i]; - if (child[0]->value.size() < data->value.size()) - { - this->heap[top] = child[0]; - top = i; - } - } - - this->heap[top] = data; -} - -template -void __ReduceIterator::heap_insert(__ReduceValue *data) -{ - __ReduceValue *parent; - int i = this->heap_size; - - while (i > 0) - { - parent = this->heap[(i - 1) / 2]; - if (data->value.size() < parent->value.size()) - { - this->heap[i] = parent; - i = (i - 1) / 2; - } - else - break; - } - - this->heap[i] = data; - this->heap_size++; -} - -template -__ReduceIterator::__ReduceIterator(struct list_head *value_list, - size_t *value_cnt) -{ - struct list_head *pos, *tmp; - int n = 0; - - list_for_each_safe(pos, tmp, value_list) - { - if (n == __REDUCE_ITERATOR_HEAP_MAX) - break; - - list_del(pos); - this->heap[n++] = list_entry(pos, __ReduceValue, list); - } - - this->heap_size = n; - *value_cnt -= n; - n /= 2; - while (n > 0) - this->heapify(--n); -} - -#undef __REDUCE_ITERATOR_HEAP_MAX - -/* VAL is not a class. */ -template -class __ReduceIterator : public ReduceIterator -{ -public: - virtual const VAL *next() - { - if (this->cursor->next == &this->value_list) - return NULL; - - this->cursor = this->cursor->next; - this->value_cnt--; - return &list_entry(this->cursor, __ReduceValue, list)->value; - } - - virtual size_t size() { return this->original_size; } - -private: - void reduce_begin() - { - this->cursor = &this->value_list; - this->original_size = this->value_cnt; - } - - void reduce_end(VAL&& value); - - size_t count() { return this->value_cnt; } - - __ReduceValue *value() - { - return list_entry(this->value_list.next, __ReduceValue, list); - } - -private: - struct list_head value_list; - size_t value_cnt; - size_t original_size; - struct list_head *cursor; - -private: - __ReduceIterator(struct list_head *value_list, size_t *value_cnt) - { - INIT_LIST_HEAD(&this->value_list); - list_splice_init(value_list, &this->value_list); - this->value_cnt = *value_cnt; - *value_cnt = 0; - } - - template friend class Reducer; -}; - -template -void __ReduceIterator::reduce_end(VAL&& value) -{ - __ReduceValue *entry; - - assert(this->cursor != &this->value_list); - while (this->value_list.next != this->cursor) - { - entry = list_entry(this->value_list.next, __ReduceValue, list); - list_del(&entry->list); - delete entry; - } - - entry = list_entry(this->cursor, __ReduceValue, list); - entry->value = std::move(value); - list_move_tail(&entry->list, &this->value_list); - this->value_cnt++; -} - -template -void Reducer::insert(KEY&& key, VAL&& value) -{ - struct rb_node **p = &this->key_tree.rb_node; - struct rb_node *parent = NULL; - __ReduceKey *entry; - - while (*p) - { - parent = *p; - using TYPE = __ReduceKey; - entry = rb_entry(*p, TYPE, rb); - if (key < entry->key) - p = &(*p)->rb_left; - else if (key > entry->key) - p = &(*p)->rb_right; - else - break; - } - - if (!*p) - { - entry = new __ReduceKey(std::move(key)); - rb_link_node(&entry->rb, parent, p); - rb_insert_color(&entry->rb, &this->key_tree); - } - - entry->insert(std::move(value)); -} - -template -void Reducer::start(reduce_function_t reduce, - std::vector> *result) -{ - struct rb_node *p = rb_first(&this->key_tree); - __ReduceKey *key; - __ReduceValue *value; - - while (p) - { - using TYPE = __ReduceKey; - key = rb_entry(p, TYPE, rb); - while (key->value_cnt > 1) - { - __ReduceIterator iter(&key->value_list, &key->value_cnt); - - do - { - VAL tmp; - iter.reduce_begin(); - reduce(&key->key, &iter, &tmp); - iter.reduce_end(std::move(tmp)); - } while (iter.count() > 1); - - list_add_tail(&iter.value()->list, &key->value_list); - key->value_cnt++; - } - - value = list_entry(key->value_list.next, __ReduceValue, list); - list_del(&value->list); - result->emplace_back(std::move(key->key), std::move(value->value)); - delete value; - - p = rb_next(p); - rb_erase(&key->rb, &this->key_tree); - delete key; - } -} - -template -Reducer::~Reducer() -{ - __ReduceKey *entry; - - while (this->key_tree.rb_node) - { - using TYPE = __ReduceKey; - entry = rb_entry(this->key_tree.rb_node, TYPE, rb); - rb_erase(&entry->rb, &this->key_tree); - delete entry; - } -} - -} - diff --git a/src/factory/WFAlgoTaskFactory.h b/src/factory/WFAlgoTaskFactory.h index e85c0d3c7a..301615dbbf 100644 --- a/src/factory/WFAlgoTaskFactory.h +++ b/src/factory/WFAlgoTaskFactory.h @@ -23,7 +23,6 @@ #include #include #include "WFTask.h" -#include "MapReduce.h" namespace algorithm { @@ -181,12 +180,6 @@ using WFRotateTask = WFThreadTask, template using rotate_callback_t = std::function *)>; -template -using WFReduceTask = WFThreadTask, - algorithm::ReduceOutput>; -template -using reduce_callback_t = std::function *)>; - class WFAlgoTaskFactory { public: @@ -258,23 +251,6 @@ class WFAlgoTaskFactory static WFRotateTask *create_rotate_task(const std::string& queue_name, T *first, T *middle, T *last, CB callback); - - template, - class CB = reduce_callback_t> - static WFReduceTask * - create_reduce_task(const std::string& queue_name, - RED reduce, - CB callback); - - template, - class CB = reduce_callback_t> - static WFReduceTask * - create_reduce_task(const std::string& queue_name, - algorithm::ReduceInput input, - RED reduce, - CB callback); }; #include "WFAlgoTaskFactory.inl" diff --git a/src/factory/WFAlgoTaskFactory.inl b/src/factory/WFAlgoTaskFactory.inl index 8184777aff..e70a0599ab 100644 --- a/src/factory/WFAlgoTaskFactory.inl +++ b/src/factory/WFAlgoTaskFactory.inl @@ -652,76 +652,3 @@ WFRotateTask *WFAlgoTaskFactory::create_rotate_task(const std::string& name, std::move(callback)); } -/****************** MapReduce ******************/ - -template -class __WFReduceTask : public WFReduceTask -{ -protected: - virtual void execute(); - -protected: - algorithm::reduce_function_t reduce; - -public: - __WFReduceTask(ExecQueue *queue, Executor *executor, - algorithm::reduce_function_t&& red, - reduce_callback_t&& cb) : - WFReduceTask(queue, executor, std::move(cb)), - reduce(std::move(red)) - { - } - - __WFReduceTask(ExecQueue *queue, Executor *executor, - algorithm::ReduceInput&& input, - algorithm::reduce_function_t&& red, - reduce_callback_t&& cb) : - WFReduceTask(queue, executor, std::move(cb)), - reduce(std::move(red)) - { - this->input = std::move(input); - } -}; - -template -void __WFReduceTask::execute() -{ - algorithm::Reducer reducer; - auto iter = this->input.begin(); - - while (iter != this->input.end()) - { - reducer.insert(std::move(iter->first), std::move(iter->second)); - iter++; - } - - this->input.clear(); - reducer.start(this->reduce, &this->output); -} - -template -WFReduceTask * -WFAlgoTaskFactory::create_reduce_task(const std::string& name, - RED reduce, - CB callback) -{ - return new __WFReduceTask(WFGlobal::get_exec_queue(name), - WFGlobal::get_compute_executor(), - std::move(reduce), - std::move(callback)); -} - -template -WFReduceTask * -WFAlgoTaskFactory::create_reduce_task(const std::string& name, - algorithm::ReduceInput input, - RED reduce, - CB callback) -{ - return new __WFReduceTask(WFGlobal::get_exec_queue(name), - WFGlobal::get_compute_executor(), - std::move(input), - std::move(reduce), - std::move(callback)); -} - diff --git a/src/include/workflow/MapReduce.h b/src/include/workflow/MapReduce.h deleted file mode 120000 index 808251d40f..0000000000 --- a/src/include/workflow/MapReduce.h +++ /dev/null @@ -1 +0,0 @@ -../../algorithm/MapReduce.h \ No newline at end of file diff --git a/src/include/workflow/MapReduce.inl b/src/include/workflow/MapReduce.inl deleted file mode 120000 index b196b71967..0000000000 --- a/src/include/workflow/MapReduce.inl +++ /dev/null @@ -1 +0,0 @@ -../../algorithm/MapReduce.inl \ No newline at end of file diff --git a/tutorial/CMakeLists.txt b/tutorial/CMakeLists.txt index 3a17c521d0..e20b283f1f 100644 --- a/tutorial/CMakeLists.txt +++ b/tutorial/CMakeLists.txt @@ -43,7 +43,6 @@ set(TUTORIAL_LIST tutorial-11-graph_task tutorial-15-name_service tutorial-17-dns_cli - tutorial-20-reducer ) if (APPLE) diff --git a/tutorial/tutorial-20-reducer.cc b/tutorial/tutorial-20-reducer.cc deleted file mode 100644 index 26c172c84b..0000000000 --- a/tutorial/tutorial-20-reducer.cc +++ /dev/null @@ -1,90 +0,0 @@ -/* - Copyright (c) 2021 Sogou, Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - - Author: Xie Han (xiehan@sogou-inc.com;63350856@qq.com) -*/ - -#include -#include -#include -#include "workflow/WFFacilities.h" -#include "workflow/WFAlgoTaskFactory.h" - -using namespace algorithm; - -struct array -{ - int *a; - int n; - size_t size() { return n; } -}; - -void reduce(const int *key, ReduceIterator *iter, array *res) -{ - const array *v1 = iter->next(); - const array *v2 = iter->next(); - - res->a = new int[v1->n + v2->n]; - res->n = v1->n + v2->n; - std::merge(v1->a, v1->a + v1->n, v2->a, v2->a + v2->n, res->a); - delete []v1->a; - delete []v2->a; -} - -WFFacilities::WaitGroup wait_group(1); - -void callback(WFReduceTask *task) -{ - ReduceOutput& output = *task->get_output(); - array& res = output[0].second; - - for (int i = 0; i < res.n; i++) - printf("%d ", res.a[i]); - - printf("\n"); - delete []res.a; - wait_group.done(); -} - -int main(int argc, char *argv[]) -{ - ReduceInput input; - array arr; - int i; - - if (argc != 2) - { - fprintf(stderr, "USAGE: %s \n", argv[0]); - exit(1); - } - - int n = atoi(argv[1]); - - for (i = 0; i < n; i++) - { - arr.n = 1; - arr.a = new int[1]; - arr.a[0] = rand() % 65536; - input.emplace_back(0, arr); - } - - auto *task = WFAlgoTaskFactory::create_reduce_task("sort", std::move(input), - reduce, callback); - task->start(); - - wait_group.wait(); - return 0; -} -