diff --git a/.gitignore b/.gitignore index cf34afd7fd..afd9d95f4f 100644 --- a/.gitignore +++ b/.gitignore @@ -10,4 +10,5 @@ benchmark.json .idea *.xcodeproj *.xcworkspace +src/lib/network/generated .vscode diff --git a/.gitmodules b/.gitmodules index 335eee4e75..39d7350151 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,6 +1,9 @@ [submodule "third_party/googletest"] path = third_party/googletest url = https://github.com/google/googletest.git +[submodule "third_party/grpc"] + path = third_party/grpc + url = https://github.com/grpc/grpc [submodule "third_party/benchmark"] path = third_party/benchmark url = https://github.com/google/benchmark.git diff --git a/README.md b/README.md index 69fa84790d..9b78af649e 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,25 @@ get via `git submodule update --init` install via homebrew: brew install tbb install via apt: apt-get install libtbb-dev +### development command line tools +install via `xcode-select --install` / `apt install build-essential` + +### autoconf +install via homebrew / packet manager + +### automake +install via homebrew / packet manager (installed as default by Ubuntu) + +### libtool +install via homebrew / packet manager + +### pkg-config +install via homebrew / packet manager (installed as default by Ubuntu) + +### get and compile protoc and gRPC +get via `submodule update --init --recursive` +installation guide on github + ### llvm (optional) install via homebrew / packet manager used for AddressSanitizer diff --git a/install.sh b/install.sh index 0622b7156e..0beaffb7a8 100755 --- a/install.sh +++ b/install.sh @@ -13,9 +13,14 @@ if echo $REPLY | grep -E '^[Yy]$' > /dev/null; then echo "Installing dependencies (this may take a while)..." if brew update >/dev/null; then # python2.7 is preinstalled on macOS - if brew install premake boost gcc clang-format gcovr tbb; then - if git submodule update --init; then - echo "Installation successful" + if brew install premake boost gcc clang-format gcovr tbb autoconf automake libtool pkg-config; then + if git submodule update --init --recursive; then + if CPPFLAGS="-Wno-deprecated-declarations" CFLAGS="-Wno-deprecated-declarations -Wno-implicit-function-declaration -Wno-shift-negative-value" make static --directory=third_party/grpc REQUIRE_CUSTOM_LIBRARIES_opt=true; then + echo "Installation successful" + else + echo "Error during gRPC installation." + exit 1 + fi else echo "Error during installation." exit 1 @@ -32,9 +37,14 @@ if echo $REPLY | grep -E '^[Yy]$' > /dev/null; then if cat /etc/lsb-release | grep DISTRIB_ID | grep Ubuntu >/dev/null; then echo "Installing dependencies (this may take a while)..." if sudo apt-get update >/dev/null; then - if sudo apt-get install -y premake4 libboost-all-dev clang-format gcovr python2.7 gcc-6 clang llvm libnuma-dev libnuma1 libtbb-dev; then - if git submodule update --init; then - echo "Installation successful." + if sudo apt-get install -y premake4 libboost-all-dev clang-format gcovr python2.7 gcc-6 clang llvm libnuma-dev libnuma1 libtbb-dev build-essential autoconf libtool; then + if git submodule update --init --recursive; then + if CPPFLAGS="-Wno-deprecated-declarations" CFLAGS="-Wno-deprecated-declarations -Wno-implicit-function-declaration -Wno-shift-negative-value" make static --directory=third_party/grpc REQUIRE_CUSTOM_LIBRARIES_opt=true; then + echo "Installation successful" + else + echo "Error during gRPC installation." + exit 1 + fi else echo "Error during installation." exit 1 diff --git a/premake4.lua b/premake4.lua index d06b21ed3f..543bac72ae 100644 --- a/premake4.lua +++ b/premake4.lua @@ -68,6 +68,12 @@ else end end +-- Generate C++ source files from protobuf grammar files +os.execute("echo \"Generating protobuf and grpc files...\"") +os.execute("[ -d src/lib/network/generated ] || mkdir src/lib/network/generated") +os.execute("./third_party/grpc/bins/opt/protobuf/protoc --cpp_out=./src/lib/network/generated -I=\"./src/lib/network/protos/\" ./src/lib/network/protos/opossum.proto") +os.execute("./third_party/grpc/bins/opt/protobuf/protoc --grpc_out=./src/lib/network/generated --plugin=protoc-gen-grpc=./third_party/grpc/bins/opt/grpc_cpp_plugin -I=\"./src/lib/network/protos/\" ./src/lib/network/protos/opossum.proto") + solution "opossum" configurations { "Debug", "Release" } flags { "FatalWarnings", "ExtraWarnings" } @@ -106,8 +112,8 @@ solution "opossum" project "googletest" kind "StaticLib" - files { "third_party/googletest/googletest/src/gtest-all.cc" } includedirs { "third_party/googletest/googletest", "third_party/googletest/googletest/include" } + files { "third_party/googletest/googletest/src/gtest-all.cc" } project "googlebenchmark" kind "StaticLib" @@ -120,25 +126,45 @@ project "googlebenchmark" project "opossum" kind "StaticLib" - files { "src/lib/**.hpp", "src/lib/**.cpp", "src/bin/server.cpp" } + includedirs { "third_party/grpc/include/", "third_party/grpc/third_party/protobuf/src/" } + files { "src/lib/**.hpp", "src/lib/**.cpp" } project "opossum-asan" kind "StaticLib" buildoptions {"-fsanitize=address -fno-omit-frame-pointer"} linkoptions {"-fsanitize=address"} - files { "src/lib/**.hpp", "src/lib/**.cpp", "src/bin/server.cpp" } + includedirs { "third_party/grpc/include/", "third_party/grpc/third_party/protobuf/src/" } + files { "src/lib/**.hpp", "src/lib/**.cpp", "src/bin/server_main.cpp" } project "opossumCoverage" kind "StaticLib" buildoptions { "-fprofile-arcs -ftest-coverage" } linkoptions { "-lgcov --coverage" } + includedirs { "third_party/grpc/include/", "third_party/grpc/third_party/protobuf/src/" } files { "src/lib/**.hpp", "src/lib/**.cpp" } +-- Static lib for the opossum protobuf and grpc code generated from opossum.proto (see action 'protoc' below) +project "opossumProtobuf" + kind "StaticLib" + buildoptions ("-Wno-unused-parameter -Wno-deprecated-declarations") + includedirs { "third_party/grpc/include/", "third_party/grpc/third_party/protobuf/src/" } + files { "src/lib/network/generated/**.pb.cc" } + +-- Exemplary opossum client, showing how to use grpc and protobuf at client-side +project "client" + kind "ConsoleApp" + links { "opossumProtobuf", "protobuf", "grpc++", "grpc", "z", "boost_program_options" } + includedirs { "third_party/grpc/include/", "third_party/grpc/third_party/protobuf/src/" } + libdirs { "third_party/grpc/libs/opt/", "third_party/grpc/libs/opt/protobuf" } + files { "src/bin/client.cpp" } + project "server" kind "ConsoleApp" - links { "opossum" } + links { "opossum", "opossumProtobuf", "protobuf", "grpc++", "grpc", "z", "boost_program_options" } -- z is needed on macos to link grpc + includedirs { "third_party/grpc/include/", "third_party/grpc/third_party/protobuf/src/" } + libdirs { "third_party/grpc/libs/opt/", "third_party/grpc/libs/opt/protobuf" } links(libs) - files { "src/bin/server.cpp" } + files { "src/bin/server_main.cpp" } project "playground" kind "ConsoleApp" @@ -149,19 +175,21 @@ project "playground" project "test" kind "ConsoleApp" - links { "opossum", "googletest" } + links { "opossum", "googletest", "opossumProtobuf", "protobuf", "grpc++", "grpc", "z" } + includedirs { "third_party/googletest/googletest/include", "third_party/grpc/include/", "third_party/grpc/third_party/protobuf/src/" } + libdirs { "third_party/grpc/libs/opt/", "third_party/grpc/libs/opt/protobuf" } links(libs) files { "src/test/**.hpp", "src/test/**.cpp" } - includedirs { "third_party/googletest/googletest/include" } postbuildcommands { "./build/test" } project "asan" kind "ConsoleApp" - links { "opossum-asan", "googletest" } + links { "opossum-asan", "googletest", "opossumProtobuf", "protobuf", "grpc++", "grpc", "z" } links(libs) files { "src/test/**.hpp", "src/test/**.cpp" } - includedirs { "third_party/googletest/googletest/include" } + includedirs { "third_party/googletest/googletest/include", "third_party/grpc/include/", "third_party/grpc/third_party/protobuf/src/" } + libdirs { "third_party/grpc/libs/opt/", "third_party/grpc/libs/opt/protobuf" } buildoptions {"-fsanitize=address -fno-omit-frame-pointer"} linkoptions { "-fsanitize=address" } postbuildcommands { "./build/asan" } @@ -177,13 +205,14 @@ project "benchmark" project "coverage" kind "ConsoleApp" - links { "opossumCoverage", "googletest" } + links { "opossumCoverage", "googletest", "opossumProtobuf", "protobuf", "grpc++", "grpc", "z" } links(libs) linkoptions {"--coverage"} files { "src/test/**.hpp", "src/test/**.cpp" } buildoptions { "-fprofile-arcs -ftest-coverage" } - includedirs { "third_party/googletest/googletest/include" } - postbuildcommands { "./build/coverage && rm -fr coverage; mkdir coverage && gcovr -s -r . --exclude=\"(.*types*.|.*test*.)\" --html --html-details -o coverage/index.html" } + includedirs { "third_party/googletest/googletest/include", "third_party/grpc/include/", "third_party/grpc/third_party/protobuf/src/" } + libdirs { "third_party/grpc/libs/opt/", "third_party/grpc/libs/opt/protobuf" } + postbuildcommands { "./build/coverage && rm -fr coverage; mkdir coverage && gcovr -s -r . --exclude=\"(.*types*.|.*test*.|.*\.pb\.|third_party)\" --html --html-details -o coverage/index.html" } newoption { trigger = "compiler", @@ -229,7 +258,7 @@ function premake.generate(obj, filename, callback) -- "make clean" should also call "premake4 clean" os.execute("awk '\\\ /help:/ {\\\ - print \"\tpremake4 clean\"\\\ + print \"\tpremake4 clean\\n\trm -r src/lib/network/generated\"\\\ }\\\ { print }' Makefile > Makefile.awk && mv Makefile.awk Makefile") end diff --git a/src/bin/client.cpp b/src/bin/client.cpp new file mode 100644 index 0000000000..4dbbd97854 --- /dev/null +++ b/src/bin/client.cpp @@ -0,0 +1,168 @@ +#include "client.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace po = boost::program_options; + +namespace opossum { + +// Sample client showing how to use protobuf and grpc at client-side +// This client uses a synchronous, blocking grpc-call, for an async client sample, see network-tests +OpossumClient::OpossumClient(std::shared_ptr channel) : _stub(proto::OpossumService::NewStub(channel)) {} + +// Assembles the client's payload, sends it and presents the response back from the server. +void OpossumClient::query(std::string& table_name, std::string& column_name, std::string& filter_op, + std::string& filter) { + // Data we are sending to the server. + proto::Request request; + + proto::GetTableOperator* get_table = nullptr; + auto root_op_variant = request.mutable_root_operator(); + + if (!column_name.empty() && !filter_op.empty() && !filter.empty()) { + // Init a TableScan (protobuf allocates and manages the needed resources) + proto::TableScanOperator* table_scan = root_op_variant->mutable_table_scan(); + table_scan->set_column_name(column_name); + table_scan->set_filter_operator(filter_op); + proto::Variant* variant = table_scan->mutable_value(); + variant->set_value_int(std::stoi(filter)); + // Add a GetTable operator as input operator for TableScan + get_table = table_scan->mutable_input_operator()->mutable_get_table(); + } + + if (!get_table) { + // Init a GetTable operator if there is no TableScan already + get_table = root_op_variant->mutable_get_table(); + } + get_table->set_table_name(table_name); + + // Container for the data we expect from the server. + proto::Response response; + + // Context for the client. It could be used to convey extra information to + // the server and/or tweak certain RPC behaviors. + ClientContext context; + + // The actual RPC (synchronous). + Status status = _stub->Query(&context, request, &response); + + // Act upon the status of the actual RPC. + if (status.ok()) { + print_response_table(response); + } else { + std::cout << "RPC failed" << std::endl; + } +} + +void OpossumClient::print_variant(const proto::Variant& variant) const { + std::cout << "|" << std::setw(20); + switch (variant.variant_case()) { + case proto::Variant::kValueInt: + std::cout << variant.value_int(); + break; + case proto::Variant::kValueFloat: + std::cout << variant.value_float(); + break; + case proto::Variant::kValueString: + std::cout << variant.value_string(); + break; + case proto::Variant::kValueDouble: + std::cout << variant.value_double(); + break; + case proto::Variant::kValueLong: + std::cout << variant.value_long(); + break; + default: + throw std::runtime_error("Unknown AllTypeVariant in operator_translator"); + } + std::cout << std::setw(0); +} + +void OpossumClient::print_response_table(proto::Response& response) const { + if (response.result_case() == proto::Response::kError) { + std::cout << "Error in request: " << response.error() << std::endl; + return; + } + + const auto table = response.response_table(); + std::cout << "=== Columns" + << " === " << std::endl; + for (int i = 0; i < table.column_type_size(); ++i) { + std::cout << "|" << std::setw(20) << table.column_type(i) << std::setw(0); + } + std::cout << "|" << std::endl; + for (int i = 0; i < table.column_name_size(); ++i) { + std::cout << "|" << std::setw(20) << table.column_name(i) << std::setw(0); + } + std::cout << "|" << std::endl; + std::cout << "=== Values" + << " === " << std::endl; + for (int row_id = 0; row_id < table.row_size(); ++row_id) { + auto row = table.row(row_id); + for (int i = 0; i < row.value_size(); ++i) { + auto variant = row.value(i); + print_variant(variant); + } + std::cout << "|" << std::endl; + } +} + +} // namespace opossum + +int main(int argc, char** argv) { + po::options_description desc("Allowed options"); + auto options = desc.add_options(); + options("help", "print help message"); + options("table_name", po::value(), "opossum table name (required)"); + options("address", po::value()->default_value("0.0.0.0:50051"), "IP:PORT"); + options("column_name", po::value()->default_value(""), "column name for table scan"); + options("filter_op", po::value()->default_value(""), "filter operation for table scan, e.g. = > >= ..."); + options("filter_val", po::value()->default_value(""), "filter value for table scan"); + + po::positional_options_description pd; + pd.add("table_name", 1); + pd.add("column_name", 1); + pd.add("filter_op", 1); + pd.add("filter_val", 1); + + po::variables_map variables; + po::store(po::command_line_parser(argc, argv).options(desc).positional(pd).run(), variables); + po::notify(variables); + + if (variables.count("help") || variables.count("table_name") == 0) { + std::cout << desc << std::endl; + return 1; + } + + auto address = variables["address"].as(); + auto table_name = variables["table_name"].as(); + auto column_name = variables["column_name"].as(); + auto filter_op = variables["filter_op"].as(); + auto filter = variables["filter_val"].as(); + + // Instantiate the client. It requires a channel, out of which the actual RPCs + // are created. This channel models a connection to an endpoint. We indicate that the channel isn't authenticated (use + // of InsecureChannelCredentials()). + opossum::OpossumClient client(grpc::CreateChannel(address, grpc::InsecureChannelCredentials())); + + std::cout << "Sending query to " << address << std::endl; + client.query(table_name, column_name, filter_op, filter); + + return 0; +} diff --git a/src/bin/client.hpp b/src/bin/client.hpp new file mode 100644 index 0000000000..e704915454 --- /dev/null +++ b/src/bin/client.hpp @@ -0,0 +1,39 @@ +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-parameter" +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" +#include +#pragma GCC diagnostic pop +#include + +#include +#include + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-parameter" +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" +#include "network/generated/opossum.grpc.pb.h" +#pragma GCC diagnostic pop + +using grpc::Channel; +using grpc::ClientContext; +using grpc::Status; + +namespace opossum { + +// Sample client showing how to use protobuf and grpc at client-side +class OpossumClient { + public: + explicit OpossumClient(std::shared_ptr channel); + + // Assembles the client's payload, sends it and presents the response back from the server. + void query(std::string& table_name, std::string& column_name, std::string& filter_op, std::string& filter); + + protected: + void print_response_table(proto::Response& response) const; + void print_variant(const proto::Variant& variant) const; + // Out of the passed in Channel comes the stub, stored here, our view of the + // server's exposed services. + std::unique_ptr _stub; +}; + +} // namespace opossum diff --git a/src/bin/server.cpp b/src/bin/server.cpp deleted file mode 100644 index a0706a1db7..0000000000 --- a/src/bin/server.cpp +++ /dev/null @@ -1,7 +0,0 @@ -#include - -int main() { - std::cout << "Built as: " << (IS_DEBUG ? "debug" : "release") << std::endl; - std::cout << "This is the server. As of now, it does nothing." << std::endl; - return 0; -} diff --git a/src/bin/server_main.cpp b/src/bin/server_main.cpp new file mode 100644 index 0000000000..9878faa343 --- /dev/null +++ b/src/bin/server_main.cpp @@ -0,0 +1,131 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "network/server.hpp" +#include "operators/import_csv.hpp" +#include "scheduler/topology.hpp" +#include "storage/storage_manager.hpp" + +namespace po = boost::program_options; + +namespace { +opossum::Server server; +volatile std::sig_atomic_t count_sigint{0}; + +void sighandler(int signum) { + switch (signum) { + case SIGINT: + if (count_sigint++ == 0) { + server.stop(); + } else { + exit(1); + } + break; + } +} + +void import_dummy_data(const std::string& directory, const std::string& filename, const std::string& table_name) { + auto importer = std::make_shared(directory, filename); + importer->execute(); + + // Cannot use importer->get_output() to store the table directly, because it is const. Need to copy values. + const auto t = importer->get_output(); + auto table = std::make_shared(); + for (size_t i = 0; i < t->col_count(); ++i) { + table->add_column(t->column_name(i), t->column_type(i)); + } + for (size_t i = 0; i < t->chunk_count(); ++i) { + auto& chunk = t->get_chunk(i); + for (size_t row_id = 0; row_id < chunk.size(); ++row_id) { + auto row = std::vector(); + for (size_t col_id = 0; col_id < t->col_count(); ++col_id) { + row.push_back((*chunk.get_column(col_id))[row_id]); + } + table->append(row); + } + } + opossum::StorageManager::get().add_table(table_name, table); +} +} // namespace + +int main(int argc, char** argv) { + std::signal(SIGINT, sighandler); + + opossum::ServerConfiguration config; + + po::options_description desc("Allowed options"); + auto options = desc.add_options(); + options("help", "print help message"); + options("address", po::value()->default_value("0.0.0.0:50051"), "IP:PORT"); + options("listener_threads", po::value()->default_value(1), "Number of threads listening for client requests"); + options("skip_scheduler", + "Skip scheduling and add response to sender queue immediately - useful for grpc no-op benchmarking"); + options("numa_max_cores", po::value()->default_value(0), + "Number of max cores used - zero indicates no limit"); + options("fake_numa", "Use a fake numa topology"); + options("fake_numa_max_workers", po::value()->default_value(0), + "Number of max workers used - zero indicates no limit"); + options("fake_numa_workers_per_node", po::value()->default_value(1), "Number of max workers per node"); + if (IS_DEBUG) { + options("csv_import_dir", po::value()->default_value("src/test/csv"), "CSV import folder"); + options("csv_import_filename", po::value()->default_value("float_int"), + "Filename (without .csv) - file is imported once at server start"); + options("csv_import_table_name", po::value()->default_value("table1"), "Import target table name"); + } + + po::variables_map variables; + po::store(po::parse_command_line(argc, argv, desc), variables); + po::notify(variables); + + if (variables.count("help")) { + std::cout << desc << std::endl; + return 1; + } + + config.address = variables["address"].as(); + config.num_listener_threads = variables["listener_threads"].as(); + config.skip_scheduler = variables.count("skip_scheduler") > 0; + + if (variables.count("fake_numa")) { + config.topology = opossum::Topology::create_fake_numa_topology( + variables["fake_numa_max_workers"].as(), variables["fake_numa_workers_per_node"].as()); + } else { + config.topology = opossum::Topology::create_numa_topology(variables["numa_max_cores"].as()); + } + + if (IS_DEBUG) { + try { + // Provide some dummy data during development - can be removed when persistance is implemented + import_dummy_data(variables["csv_import_dir"].as(), + variables["csv_import_filename"].as(), + variables["csv_import_table_name"].as()); + } catch (const std::ios_base::failure e) { + std::cerr << "Unable to import CSV: " << variables["csv_import_dir"].as() << "/" + << variables["csv_import_filename"].as() << std::endl; + std::cerr << "Error: " << e.what() << std::endl; + std::cerr << "Dummy data import skipped" << std::endl; + } + } + + server.start(config); + + return 0; +} diff --git a/src/lib/network/operator_translator.cpp b/src/lib/network/operator_translator.cpp new file mode 100644 index 0000000000..85ce85f13a --- /dev/null +++ b/src/lib/network/operator_translator.cpp @@ -0,0 +1,363 @@ +#include "operator_translator.hpp" + +#include +#include +#include +#include + +#include "../common.hpp" +#include "../operators/abstract_join_operator.hpp" +#include "../operators/abstract_operator.hpp" +#include "../operators/difference.hpp" +#include "../operators/export_binary.hpp" +#include "../operators/export_csv.hpp" +#include "../operators/get_table.hpp" +#include "../operators/import_csv.hpp" +#include "../operators/index_column_scan.hpp" +#include "../operators/join_nested_loop_a.hpp" +#include "../operators/print.hpp" +#include "../operators/product.hpp" +#include "../operators/projection.hpp" +#include "../operators/sort.hpp" +#include "../operators/table_scan.hpp" +#include "../operators/union_all.hpp" +#include "../types.hpp" + +namespace opossum { + +inline AllTypeVariant translate_variant(const proto::Variant& variant) { + switch (variant.variant_case()) { + case proto::Variant::kValueInt: + return variant.value_int(); + case proto::Variant::kValueFloat: + return variant.value_float(); + case proto::Variant::kValueString: + return variant.value_string(); + case proto::Variant::kValueDouble: + return variant.value_double(); + case proto::Variant::kValueLong: + return variant.value_long(); + default: + throw std::runtime_error("Unknown AllTypeVariant in operator_translator"); + } +} + +inline optional translate_optional_variant(const proto::Variant& variant) { + if (variant.variant_case() == proto::Variant::VARIANT_NOT_SET) + return nullopt; + else + return translate_variant(variant); +} + +inline std::shared_ptr OperatorTranslator::translate( + const proto::ProjectionOperator& projection_operator) { + const auto column_names_field = projection_operator.column_name(); + auto column_names = std::vector(std::begin(column_names_field), std::end(column_names_field)); + if (!projection_operator.has_input_operator()) { + throw std::runtime_error("Missing Input Operator in Projection."); + } + auto input_task = translate_proto(projection_operator.input_operator()); + + auto projection = std::make_shared(input_task->get_operator(), column_names); + auto projection_task = std::make_shared(projection); + input_task->set_as_predecessor_of(projection_task); + _tasks.push_back(projection_task); + + return projection_task; +} + +inline std::shared_ptr OperatorTranslator::translate(const proto::ProductOperator& product_operator) { + auto& prefix_left = product_operator.prefix_left(); + auto& prefix_right = product_operator.prefix_right(); + + if (!product_operator.has_left_operator()) { + throw std::runtime_error("Missing Left Operator in Product."); + } + auto input_left_task = translate_proto(product_operator.left_operator()); + if (!product_operator.has_right_operator()) { + throw std::runtime_error("Missing Right Operator in Product."); + } + auto input_right_task = translate_proto(product_operator.right_operator()); + + auto product = std::make_shared(input_left_task->get_operator(), input_right_task->get_operator(), + prefix_left, prefix_right); + auto product_task = std::make_shared(product); + input_left_task->set_as_predecessor_of(product_task); + input_right_task->set_as_predecessor_of(product_task); + _tasks.push_back(product_task); + + return product_task; +} + +inline std::shared_ptr OperatorTranslator::translate( + const proto::NestedLoopJoinOperator& nested_loop_join_operator) { + auto& prefix_left = nested_loop_join_operator.prefix_left(); + auto& prefix_right = nested_loop_join_operator.prefix_right(); + + if (!nested_loop_join_operator.has_left_operator()) { + throw std::runtime_error("Missing Left Operator in Nested Loop Join."); + } + if (!nested_loop_join_operator.has_right_operator()) { + throw std::runtime_error("Missing Right Operator in Nested Loop Join."); + } + auto input_left_task = translate_proto(nested_loop_join_operator.left_operator()); + auto input_right_task = translate_proto(nested_loop_join_operator.right_operator()); + auto& op = nested_loop_join_operator.op(); + + JoinMode join_mode; + switch (nested_loop_join_operator.mode()) { + case proto::NestedLoopJoinOperator::Inner: + join_mode = JoinMode::Inner; + break; + case proto::NestedLoopJoinOperator::Left: + join_mode = JoinMode::Left; + break; + case proto::NestedLoopJoinOperator::Right: + join_mode = JoinMode::Right; + break; + case proto::NestedLoopJoinOperator::Outer: + join_mode = JoinMode::Outer; + break; + case proto::NestedLoopJoinOperator::Cross: + join_mode = JoinMode::Cross; + break; + case proto::NestedLoopJoinOperator::Natural: + join_mode = JoinMode::Natural; + break; + case proto::NestedLoopJoinOperator::Self: + join_mode = JoinMode::Self; + break; + default: + throw std::runtime_error("Unknown join mode for nested loop join operator in operator_translator"); + } + + std::shared_ptr nested_loop_join; + + if (!nested_loop_join_operator.left_column_name().empty() && !nested_loop_join_operator.right_column_name().empty()) { + auto column_names = + std::make_pair(nested_loop_join_operator.left_column_name(), nested_loop_join_operator.right_column_name()); + auto join_columns = optional>(column_names); + nested_loop_join = + std::make_shared(input_left_task->get_operator(), input_right_task->get_operator(), + join_columns, op, join_mode, prefix_left, prefix_right); + } else { + nested_loop_join = + std::make_shared(input_left_task->get_operator(), input_right_task->get_operator(), nullopt, + op, join_mode, prefix_left, prefix_right); + } + + auto nested_loop_join_task = std::make_shared(nested_loop_join); + input_left_task->set_as_predecessor_of(nested_loop_join_task); + input_right_task->set_as_predecessor_of(nested_loop_join_task); + _tasks.push_back(nested_loop_join_task); + + return nested_loop_join_task; +} + +inline std::shared_ptr OperatorTranslator::translate( + const proto::TableScanOperator& table_scan_operator) { + const auto& column_name = table_scan_operator.column_name(); + const auto& filter_op = table_scan_operator.filter_operator(); + if (!table_scan_operator.has_input_operator()) { + throw std::runtime_error("Missing Input Operator in Table Scan."); + } + auto input_task = translate_proto(table_scan_operator.input_operator()); + + const auto value = translate_variant(table_scan_operator.value()); + const auto value2 = translate_optional_variant(table_scan_operator.value2()); + + auto table_scan = std::make_shared(input_task->get_operator(), column_name, filter_op, value, value2); + auto scan_task = std::make_shared(table_scan); + input_task->set_as_predecessor_of(scan_task); + _tasks.push_back(scan_task); + + return scan_task; +} + +inline std::shared_ptr OperatorTranslator::translate( + const proto::IndexColumnScanOperator& index_column_scan_operator) { + const auto& column_name = index_column_scan_operator.column_name(); + const auto& filter_operator = index_column_scan_operator.filter_operator(); + if (!index_column_scan_operator.has_input_operator()) { + throw std::runtime_error("Missing Input Operator in Index Column Scan."); + } + auto input_task = translate_proto(index_column_scan_operator.input_operator()); + + const auto value = translate_variant(index_column_scan_operator.value()); + const auto value2 = translate_optional_variant(index_column_scan_operator.value2()); + + auto index_column_scan = + std::make_shared(input_task->get_operator(), column_name, filter_operator, value, value2); + auto index_column_scan_task = std::make_shared(index_column_scan); + input_task->set_as_predecessor_of(index_column_scan_task); + _tasks.push_back(index_column_scan_task); + + return index_column_scan_task; +} + +inline std::shared_ptr OperatorTranslator::translate(const proto::GetTableOperator& get_table_operator) { + auto get_table = std::make_shared(get_table_operator.table_name()); + auto task = std::make_shared(get_table); + _tasks.push_back(task); + + return task; +} + +inline std::shared_ptr OperatorTranslator::translate(const proto::SortOperator& sort_operator) { + const auto& column_name = sort_operator.column_name(); + const auto ascending = sort_operator.ascending(); + if (!sort_operator.has_input_operator()) { + throw std::runtime_error("Missing Input Operator in Sort."); + } + auto input_task = translate_proto(sort_operator.input_operator()); + + auto sort = std::make_shared(input_task->get_operator(), column_name, ascending); + auto sort_task = std::make_shared(sort); + input_task->set_as_predecessor_of(sort_task); + _tasks.push_back(sort_task); + + return sort_task; +} + +inline std::shared_ptr OperatorTranslator::translate(const proto::UnionAllOperator& union_all_operator) { + if (!union_all_operator.has_input_operator1()) { + throw std::runtime_error("Missing Input Operator 1 in Union All."); + } + auto input_task1 = translate_proto(union_all_operator.input_operator1()); + if (!union_all_operator.has_input_operator2()) { + throw std::runtime_error("Missing Input Operator 2 in Union All."); + } + auto input_task2 = translate_proto(union_all_operator.input_operator2()); + + auto union_all = std::make_shared(input_task1->get_operator(), input_task2->get_operator()); + auto union_all_task = std::make_shared(union_all); + input_task1->set_as_predecessor_of(union_all_task); + input_task2->set_as_predecessor_of(union_all_task); + _tasks.push_back(union_all_task); + + return union_all_task; +} + +inline std::shared_ptr OperatorTranslator::translate( + const proto::DifferenceOperator& difference_operator) { + if (!difference_operator.has_left_operator()) { + throw std::runtime_error("Missing Left Operator in Difference."); + } + if (!difference_operator.has_right_operator()) { + throw std::runtime_error("Missing Right Operator in Difference."); + } + auto input_left_task = translate_proto(difference_operator.left_operator()); + auto input_right_task = translate_proto(difference_operator.right_operator()); + + auto difference = std::make_shared(input_left_task->get_operator(), input_right_task->get_operator()); + auto difference_task = std::make_shared(difference); + input_left_task->set_as_predecessor_of(difference_task); + input_right_task->set_as_predecessor_of(difference_task); + _tasks.push_back(difference_task); + + return difference_task; +} + +inline std::shared_ptr OperatorTranslator::translate( + const proto::ImportCsvOperator& import_csv_operation) { + const auto& directory = import_csv_operation.directory(); + const auto& filename = import_csv_operation.filename(); + + auto import_csv = std::make_shared(directory, filename); + auto import_task = std::make_shared(import_csv); + _tasks.push_back(import_task); + + return import_task; +} + +inline std::shared_ptr OperatorTranslator::translate( + const proto::ExportCsvOperator& export_csv_operator) { + const auto& directory = export_csv_operator.directory(); + const auto& filename = export_csv_operator.filename(); + if (!export_csv_operator.has_input_operator()) { + throw std::runtime_error("Missing Input Operator in Export CSV."); + } + auto input_task = translate_proto(export_csv_operator.input_operator()); + auto export_csv = std::make_shared(input_task->get_operator(), directory, filename); + auto export_csv_task = std::make_shared(export_csv); + + input_task->set_as_predecessor_of(export_csv_task); + _tasks.push_back(export_csv_task); + + return export_csv_task; +} + +inline std::shared_ptr OperatorTranslator::translate( + const proto::ExportBinaryOperator& export_binary_operator) { + const auto& filename = export_binary_operator.filename(); + if (!export_binary_operator.has_input_operator()) { + throw std::runtime_error("Missing Input Operator in Export Binary."); + } + auto input_task = translate_proto(export_binary_operator.input_operator()); + auto export_binary = std::make_shared(input_task->get_operator(), filename); + auto export_binary_task = std::make_shared(export_binary); + + input_task->set_as_predecessor_of(export_binary_task); + _tasks.push_back(export_binary_task); + + return export_binary_task; +} + +inline std::shared_ptr OperatorTranslator::translate(const proto::PrintOperator& print_operator) { + if (!print_operator.has_input_operator()) { + throw std::runtime_error("Missing Input Operator in Print."); + } + auto input_task = translate_proto(print_operator.input_operator()); + auto print = std::make_shared(input_task->get_operator()); + auto print_task = std::make_shared(print); + input_task->set_as_predecessor_of(print_task); + _tasks.push_back(print_task); + + return print_task; +} + +const std::vector>& OperatorTranslator::build_tasks_from_proto( + const proto::OperatorVariant& op) { + translate_proto(op); + _root_task = _tasks.back(); + return _tasks; +} + +// Add a case-statement when adding new operators +std::shared_ptr OperatorTranslator::translate_proto(const proto::OperatorVariant& op) { + switch (op.operator_case()) { + case proto::OperatorVariant::kGetTable: + return translate(op.get_table()); + case proto::OperatorVariant::kTableScan: + return translate(op.table_scan()); + case proto::OperatorVariant::kProjection: + return translate(op.projection()); + case proto::OperatorVariant::kProduct: + return translate(op.product()); + case proto::OperatorVariant::kSort: + return translate(op.sort()); + case proto::OperatorVariant::kUnionAll: + return translate(op.union_all()); + case proto::OperatorVariant::kImportCsv: + return translate(op.import_csv()); + case proto::OperatorVariant::kPrint: + return translate(op.print()); + case proto::OperatorVariant::kDifference: + return translate(op.difference()); + case proto::OperatorVariant::kExportCsv: + return translate(op.export_csv()); + case proto::OperatorVariant::kExportBinary: + return translate(op.export_binary()); + case proto::OperatorVariant::kIndexColumnScan: + return translate(op.index_column_scan()); + case proto::OperatorVariant::kNestedLoopJoin: + return translate(op.nested_loop_join()); + case proto::OperatorVariant::OPERATOR_NOT_SET: + throw std::runtime_error( + "Operator not set. Missing dependency. Cannot translate proto object to opossum operator."); + default: + throw std::runtime_error("Unknown operator type in operator_translator"); + } +} + +} // namespace opossum diff --git a/src/lib/network/operator_translator.hpp b/src/lib/network/operator_translator.hpp new file mode 100644 index 0000000000..3ce597bba0 --- /dev/null +++ b/src/lib/network/operator_translator.hpp @@ -0,0 +1,44 @@ +#pragma once + +#include +#include +#include + +#include "../scheduler/operator_task.hpp" +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-parameter" +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" +#include "generated/opossum.pb.h" +#pragma GCC diagnostic pop + +namespace opossum { + +// Translates a Protocol Buffer object tree structure into OperatorTasks with dependencies +class OperatorTranslator { + public: + // Recursively creates Tasks for all input-operators of `op` and a task for `op` itself + const std::vector>& build_tasks_from_proto(const proto::OperatorVariant& op); + // Returns the root task, i.e. the root element of the dependency tree structure. It is the last one to be executed by + // the scheduler. + std::shared_ptr root_task() { return _root_task; } + + protected: + std::shared_ptr translate_proto(const proto::OperatorVariant& op); + inline std::shared_ptr translate(const proto::GetTableOperator&); + inline std::shared_ptr translate(const proto::TableScanOperator&); + inline std::shared_ptr translate(const proto::ProjectionOperator& projection_operator); + inline std::shared_ptr translate(const proto::ProductOperator& product_operator); + inline std::shared_ptr translate(const proto::SortOperator&); + inline std::shared_ptr translate(const proto::UnionAllOperator&); + inline std::shared_ptr translate(const proto::ImportCsvOperator&); + inline std::shared_ptr translate(const proto::PrintOperator&); + inline std::shared_ptr translate(const proto::DifferenceOperator&); + inline std::shared_ptr translate(const proto::ExportCsvOperator&); + inline std::shared_ptr translate(const proto::ExportBinaryOperator&); + inline std::shared_ptr translate(const proto::IndexColumnScanOperator&); + inline std::shared_ptr translate(const proto::NestedLoopJoinOperator&); + std::vector> _tasks; + std::shared_ptr _root_task; +}; + +} // namespace opossum diff --git a/src/lib/network/protos/opossum.proto b/src/lib/network/protos/opossum.proto new file mode 100644 index 0000000000..ac4b98347e --- /dev/null +++ b/src/lib/network/protos/opossum.proto @@ -0,0 +1,154 @@ +syntax = "proto3"; + +package opossum.proto; + +// gRPC service definition +service OpossumService { + rpc Query (Request) returns (Response) {} +} + +// AllTypeVariant "union" +message Variant { + oneof variant { + int32 value_int = 1; + float value_float = 2; + string value_string = 3; + double value_double = 4; + int64 value_long = 5; + }; +} + + +// Request and Operators + +message Request { + OperatorVariant root_operator = 1; +} + +// Add new operators to this "union" +message OperatorVariant { + oneof operator { + GetTableOperator get_table = 1; + ProjectionOperator projection = 2; + TableScanOperator table_scan = 3; + ProductOperator product = 4; + SortOperator sort = 5; + UnionAllOperator union_all = 6; + ImportCsvOperator import_csv = 7; + PrintOperator print = 8; + DifferenceOperator difference = 9; + ExportCsvOperator export_csv = 10; + ExportBinaryOperator export_binary = 11; + IndexColumnScanOperator index_column_scan = 12; + NestedLoopJoinOperator nested_loop_join = 13; + } +} + +message GetTableOperator { + string table_name = 1; +} + +message ProjectionOperator { + OperatorVariant input_operator = 1; + repeated string column_name = 2; +} + +message TableScanOperator { + OperatorVariant input_operator = 1; + string column_name = 2; + string filter_operator = 3; + Variant value = 4; + Variant value2 = 5; // not explicitly specified as optional as everything is optional +} + +message IndexColumnScanOperator { + OperatorVariant input_operator = 1; + string column_name = 2; + string filter_operator = 3; + Variant value = 4; + Variant value2 = 5; // not explicitly specified as optional as everything is optional +} + +message NestedLoopJoinOperator { + OperatorVariant left_operator = 1; + OperatorVariant right_operator = 2; + string left_column_name = 3; + string right_column_name = 4; + string op = 5; + enum JoinMode { + Inner = 0; + Left = 1; + Right = 2; + Outer = 3; + Cross = 4; + Natural = 5; + Self = 6; + } + JoinMode mode = 6; + string prefix_left = 7; + string prefix_right = 8; +} + +message ProductOperator { // 2 inputs + OperatorVariant left_operator = 1; + OperatorVariant right_operator = 2; + string prefix_left = 3; + string prefix_right = 4; +} + +message SortOperator { + OperatorVariant input_operator = 1; + string column_name = 2; + bool ascending = 3; +} + +message UnionAllOperator { // 2 inputs + OperatorVariant input_operator1 = 1; + OperatorVariant input_operator2 = 2; +} + +message ImportCsvOperator { + OperatorVariant input_operator = 1; + string directory = 2; + string filename = 3; +} + +message ExportCsvOperator { + OperatorVariant input_operator = 1; + string directory = 2; + string filename = 3; +} + +message ExportBinaryOperator { + OperatorVariant input_operator = 1; + string filename = 3; +} + +message PrintOperator { + OperatorVariant input_operator = 1; +} + +message DifferenceOperator { + OperatorVariant left_operator = 1; + OperatorVariant right_operator = 2; +} + + +// Response + +message Response { + oneof result { + Table response_table = 1; + string error = 2; + } +} + +message Table { + repeated string column_type = 1; + repeated string column_name = 2; + repeated Row row = 3; +} + +message Row { + repeated Variant value = 1; +} diff --git a/src/lib/network/request_handler.cpp b/src/lib/network/request_handler.cpp new file mode 100644 index 0000000000..faaea7a119 --- /dev/null +++ b/src/lib/network/request_handler.cpp @@ -0,0 +1,79 @@ +#include "request_handler.hpp" + +#include +#include + +#include "network/operator_translator.hpp" +#include "scheduler/job_task.hpp" + +namespace opossum { + +// Take in the "service" instance (in this case representing an asynchronous +// server) and the completion queue "cq" used for asynchronous communication +// with the gRPC runtime. +RequestHandler::RequestHandler() : _responder(&_ctx), _has_new_request(true) {} + +bool RequestHandler::has_new_request() const { return _has_new_request; } + +void RequestHandler::set_new_request_flag(const bool flag) { _has_new_request = flag; } + +proto::Request& RequestHandler::get_request() { return _request; } + +grpc::ServerContext& RequestHandler::get_server_context() { return _ctx; } + +grpc::ServerAsyncResponseWriter& RequestHandler::get_responder() { return _responder; } + +void RequestHandler::parse_and_schedule(const bool skip_scheduler) { + // The actual processing. + if (skip_scheduler) { + // Skip scheduling and send response immediately + send_response(); + return; + } + + // Empty request - a NOOP is scheduled + if (!_request.has_root_operator()) { + auto noop_job = std::make_shared([this]() { send_response(); }); + noop_job->schedule(); + return; + } + + // Parsing of invalid request raises an exception + try { + OperatorTranslator translator; + const auto& tasks = translator.build_tasks_from_proto(_request.root_operator()); + auto root_task = translator.root_task(); + auto root_operator = root_task->get_operator(); + auto materialize_job = std::make_shared([this, root_operator]() { + // These lines are executed by the opossum scheduler + auto table = root_operator->get_output(); + // Materialize and fill response + ResponseBuilder response_builder; + response_builder.build_response(_response, std::move(table)); + + send_response(); + }); + root_task->set_as_predecessor_of(materialize_job); + + // Schedule all tasks + for (const auto& task : tasks) { + task->schedule(); + } + materialize_job->schedule(); + } catch (const std::exception& e) { + if (IS_DEBUG) { + std::cerr << "Exception: " << e.what() << std::endl; + } + _response.set_error(e.what()); + send_response(); + } +} + +void RequestHandler::send_response() { + // And we are done! Let the gRPC runtime know we've finished, using the + // memory address of this instance as the uniquely identifying tag for + // the event. + _responder.Finish(_response, grpc::Status::OK, this); +} + +} // namespace opossum diff --git a/src/lib/network/request_handler.hpp b/src/lib/network/request_handler.hpp new file mode 100644 index 0000000000..d4d78e9e18 --- /dev/null +++ b/src/lib/network/request_handler.hpp @@ -0,0 +1,53 @@ +#pragma once + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-parameter" +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" +#include +#include + +#include "network/generated/opossum.grpc.pb.h" +#pragma GCC diagnostic pop +#include "network/response_builder.hpp" + +namespace opossum { +// Class encompasing the state and logic needed to serve a request. +class RequestHandler { + public: + RequestHandler(); + + bool has_new_request() const; + + void set_new_request_flag(const bool flag); + + proto::Request& get_request(); + + grpc::ServerAsyncResponseWriter& get_responder(); + + grpc::ServerContext& get_server_context(); + + // Translates protocol buffer objects into OperatorTasks and schedules them together with a materialization task + void parse_and_schedule(const bool skip_scheduler); + + // Instructs the gRPC engine to send a response back to the client + void send_response(); + + protected: + // Context for the RPC, allowing to tweak aspects of it such as the use + // of compression, authentication, as well as to send metadata back to the + // client. + grpc::ServerContext _ctx; + + // What we get from the client. + proto::Request _request; + // What we send back to the client. + proto::Response _response; + + // The means to get back to the client. + grpc::ServerAsyncResponseWriter _responder; + + // Is set to 'true' in constructor and set to 'false' the first time it + bool _has_new_request; +}; + +} // namespace opossum diff --git a/src/lib/network/response_builder.hpp b/src/lib/network/response_builder.hpp new file mode 100644 index 0000000000..25bdec26b9 --- /dev/null +++ b/src/lib/network/response_builder.hpp @@ -0,0 +1,127 @@ +#pragma once + +#include +#include + +#include "network/generated/opossum.grpc.pb.h" +#include "storage/column_visitable.hpp" +#include "storage/dictionary_column.hpp" +#include "storage/reference_column.hpp" +#include "storage/value_column.hpp" + +namespace opossum { + +// We build a row based protobuf response table. We iterate one column chunk by chunk, then the next column chunk by +// chunk etc. The column values become processed via a visitor pattern. +class ResponseBuilder { + public: + void build_response(proto::Response& response, std::shared_ptr table) { + // Creates a new protobuf Table and links it to the Response object + const auto proto_table = response.mutable_response_table(); + + // Add all rows in one run so that they already exist when we iterate column-based later + for (size_t i = 0; i < table->row_count(); ++i) { + proto_table->add_row(); + } + + // Iterate a column chunk by chunk and apply a typed ColumnVisitor, then the next column etc. + for (size_t column_index = 0, column_count = table->col_count(); column_index < column_count; ++column_index) { + const auto& type = table->column_type(column_index); + + // Register column type and name + proto_table->add_column_type(type); + proto_table->add_column_name(table->column_name(column_index)); + + auto visitor = make_unique_by_column_type(type); + uint32_t row_index = 0u; + // Visit a specific column chunk by chunk + for (ChunkID chunk_id = 0; chunk_id < table->chunk_count(); ++chunk_id) { + const auto& chunk = table->get_chunk(chunk_id); + if (chunk.size() == 0) { + continue; + } + const auto column = chunk.get_column(column_index); + auto context = std::make_shared(proto_table, row_index); + column->visit(*visitor, context); + row_index = context->row_index; + } + } + } + + protected: + struct ResponseContext : ColumnVisitableContext { + ResponseContext(proto::Table* t, uint32_t i) : proto_table(t), row_index(i) {} + + proto::Table* proto_table; + uint32_t row_index; + }; + + template + class ResponseBuilderVisitor : public ColumnVisitable { + public: + void handle_value_column(BaseColumn& base_column, std::shared_ptr base_context) override { + auto context = std::static_pointer_cast(base_context); + const auto& column = static_cast&>(base_column); + const auto& values = column.values(); + auto row_index = context->row_index; + auto proto_table = context->proto_table; + + // Put every value of this column into the corresponding protobuf row + for (const auto& value : values) { + auto row = proto_table->mutable_row(row_index); + auto row_value = row->add_value(); + set_row_value(row_value, value); + row_index++; + } + context->row_index = row_index; + } + + void handle_dictionary_column(BaseColumn& base_column, + std::shared_ptr base_context) override { + auto context = std::static_pointer_cast(base_context); + const auto& column = static_cast&>(base_column); + auto row_index = context->row_index; + auto proto_table = context->proto_table; + const auto& attribute_vector = *column.attribute_vector(); + const auto dictionary = *column.dictionary(); + + for (size_t i = 0; i < attribute_vector.size(); ++i) { + const auto& value = dictionary[attribute_vector.get(i)]; + auto row = proto_table->mutable_row(row_index); + auto row_value = row->add_value(); + set_row_value(row_value, value); + row_index++; + } + context->row_index = row_index; + } + + void handle_reference_column(ReferenceColumn& column, + std::shared_ptr base_context) override { + auto context = std::static_pointer_cast(base_context); + const auto referenced_table = column.referenced_table(); + auto row_index = context->row_index; + auto proto_table = context->proto_table; + + for (const auto& row_id : *column.pos_list()) { + auto chunk_info = referenced_table->locate_row(row_id); + auto& chunk = referenced_table->get_chunk(chunk_info.first); + auto& referenced_column = *chunk.get_column(column.referenced_column_id()); + auto& value = referenced_column[chunk_info.second]; + auto row = proto_table->mutable_row(row_index); + auto row_value = row->add_value(); + set_row_value(row_value, type_cast(value)); + row_index++; + } + context->row_index = row_index; + } + + protected: + void set_row_value(proto::Variant* row_value, ::google::protobuf::int32 value) { row_value->set_value_int(value); } + void set_row_value(proto::Variant* row_value, ::google::protobuf::int64 value) { row_value->set_value_long(value); } + void set_row_value(proto::Variant* row_value, float value) { row_value->set_value_float(value); } + void set_row_value(proto::Variant* row_value, double value) { row_value->set_value_double(value); } + void set_row_value(proto::Variant* row_value, std::string value) { row_value->set_value_string(value); } + }; +}; + +} // namespace opossum diff --git a/src/lib/network/server.cpp b/src/lib/network/server.cpp new file mode 100644 index 0000000000..ea688f9543 --- /dev/null +++ b/src/lib/network/server.cpp @@ -0,0 +1,144 @@ +#include "server.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "operators/abstract_operator.hpp" +#include "operators/get_table.hpp" +#include "operators/import_csv.hpp" +#include "operators/print.hpp" +#include "operators/projection.hpp" +#include "operators/table_scan.hpp" +#include "scheduler/job_task.hpp" +#include "scheduler/node_queue_scheduler.hpp" +#include "scheduler/operator_task.hpp" +#include "storage/storage_manager.hpp" +#include "storage/table.hpp" + +namespace opossum { + +void Server::start(const ServerConfiguration& config, const bool waiting_server_thread) { + CurrentScheduler::set(std::make_shared(config.topology)); + + _skip_scheduler = config.skip_scheduler; + + grpc::ServerBuilder builder; + // Listen on the given address without any authentication mechanism. + builder.AddListeningPort(config.address, grpc::InsecureServerCredentials()); + // Register "_service" as the instance through which we'll communicate with + // clients. In this case it corresponds to an *asynchronous* service. + builder.RegisterService(&_service); + // Get hold of the completion queues used for the asynchronous communication + // with the gRPC runtime. We use one queue per thread. + for (size_t thread_id = 0; thread_id < config.num_listener_threads; ++thread_id) { + _cqs.push_back(builder.AddCompletionQueue()); + } + // Finally assemble the grpc server. + _server = builder.BuildAndStart(); + std::cout << "Server listening on " << config.address << " with " << config.num_listener_threads + << " listening threads" << std::endl; + + // Start worker threads + for (size_t thread_id = 0; thread_id < config.num_listener_threads; ++thread_id) { + _listener_threads.emplace_back(&Server::thread_func_handle_rpcs, this, thread_id); + } + + if (waiting_server_thread) { + _server->Wait(); + } +} + +void Server::stop() { + std::cout << std::endl << "Stopping server..." << std::endl; + + _server->Shutdown(); + + // Always shutdown the completion queue after the server. + for (auto& cq : _cqs) { + // Shutting down completion queue causes clean of created RequestHandler objects + cq->Shutdown(); + } + + CurrentScheduler::get()->finish(); + CurrentScheduler::set(nullptr); + + for (auto& thread : _listener_threads) { + thread.join(); + } + + std::cout << "Server stopped" << std::endl; +} + +// Every server-listener-thread starts in this function. +// +// A gRPC client request is processed as follows: +// 1. A new RequestHandler instance is created and registered to serve a new incoming client request +// 2. Next() is called on the completion queue, blocking until a client request is incoming or the queue is shutting +// down +// 3. When a valid request is incoming, the 'tag' is set to a RequestHandler instance pointer and request_ok +// is set to 'true' +// 4. A new RequestHandler instance is created to server a future client request. +// 5. The current request is parsed (creating OperatorTasks from Protocol Buffer objects) and every Task is scheduled +// 6. A final JobTask is created and scheduled which builds a response (materialize the response table) and notifies +// gRPC to send the response back to the client finally +// 7. When the client confirms that he received the response, the "request" is put again into the completion queue. +// When Next() delivers this object, calling has_new_request() on the RequestHandler instance (referenced via the +// 'tag' pointer) results in 'false', so that the RequestHandler instance is deleted. +// To sum it up, every "request" is passing the completion queue two times. First as a new incoming client +// request, second as a finished client request to become cleaned up. +// +// Using the RequestHandler instance pointer to tag, i.e. uniquely identify a client request is an elegant way to avoid +// additional data structures. One could use something else (e.g. a UUID) instead of the RequestHandler instance pointer +// to tag a request. But then we need a mapping from UUID to the smart-pointered RequestHandler instance. This would be +// no improvement in avoiding leaks, because we are still depending on gRPC's leak free completion queue processing. +void Server::thread_func_handle_rpcs(const size_t thread_index) { + // Initially spawn one new RequestHandler instance to serve the first client request. + create_and_register_request_handler(thread_index); + + void* tag; // uniquely identifies a request. + bool request_ok; + + // Block waiting to read the next event from the completion queue. The event is uniquely identified by its tag, which + // in this case is the memory address of a RequestHandler instance. + // The return value of Next() should always be checked, because it tells us whether there will be further events in + // the queue to become processed. + // When the queue is shut down and empty, Next() returns 'false'. Otherwise, calling Next() still returns 'true' + // until the queue is empty, but 'request_ok' is set to 'false'. This is needed to clean up all remaining + // RequestHandler instances in the queue properly. + while (_cqs[thread_index]->Next(&tag, &request_ok)) { + RequestHandler* request_handler = static_cast(tag); + if (request_handler->has_new_request() && request_ok) { + request_handler->set_new_request_flag(false); + // Create new RequestHandler to serve a future incoming client request + create_and_register_request_handler(thread_index); + // Start processing the current request + request_handler->parse_and_schedule(_skip_scheduler); + } else { + // Response was already sent to client and the request is completed or CompletionQueue was shut down + // Clean up allocated resources of the request in server + delete request_handler; + } + } +} + +void Server::create_and_register_request_handler(const size_t thread_index) { + // Spawn a new "empty" RequestHandler instance. + // The instance will be deleted after response was sent back to the client or CompletionQueue was shut down + RequestHandler* request_handler = new RequestHandler(); + + // We register the "empty" RequestHandler instance to serve a new client request. The request_handler pointer acts as + // the tag uniquely identifying the request (so that different RequestHandler instances can serve different requests + // concurrently), in this case the memory address of this RequestHandler instance. + grpc::ServerCompletionQueue* completion_queue = _cqs[thread_index].get(); + _service.RequestQuery(&request_handler->get_server_context(), &request_handler->get_request(), + &request_handler->get_responder(), completion_queue, completion_queue, request_handler); +} + +} // namespace opossum diff --git a/src/lib/network/server.hpp b/src/lib/network/server.hpp new file mode 100644 index 0000000000..f6c11ebca9 --- /dev/null +++ b/src/lib/network/server.hpp @@ -0,0 +1,49 @@ +#pragma once + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-parameter" +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" +#include +#pragma GCC diagnostic pop +#include + +#include +#include +#include +#include + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" +#pragma GCC diagnostic ignored "-Wunused-parameter" +#include "network/generated/opossum.grpc.pb.h" +#pragma GCC diagnostic pop +#include "network/operator_translator.hpp" +#include "network/request_handler.hpp" +#include "network/response_builder.hpp" +#include "network/server_configuration.hpp" + +namespace opossum { + +class Server { + public: + /** + * Start the server + * @param config A config to specify server and scheduler parameters + * @param waiting_server_thread If true, the thread calling this function (normally the main-thread) will be blocking + * after server start until shutdown. `false` is used in server tests. + */ + void start(const ServerConfiguration& config, const bool waiting_server_thread = true); + void stop(); + + protected: + void thread_func_handle_rpcs(const size_t thread_index); + void create_and_register_request_handler(const size_t thread_index); + + std::vector> _cqs; + proto::OpossumService::AsyncService _service; + std::unique_ptr _server; + std::vector _listener_threads; + bool _skip_scheduler; +}; + +} // namespace opossum diff --git a/src/lib/network/server_configuration.hpp b/src/lib/network/server_configuration.hpp new file mode 100644 index 0000000000..ccbf8d9bc7 --- /dev/null +++ b/src/lib/network/server_configuration.hpp @@ -0,0 +1,18 @@ +#pragma once + +#include +#include + +#include "scheduler/topology.hpp" + +namespace opossum { + +// See Server::start() +struct ServerConfiguration { + std::string address; + size_t num_listener_threads; + bool skip_scheduler; + std::shared_ptr topology; +}; + +} // namespace opossum diff --git a/src/lib/operators/abstract_operator.hpp b/src/lib/operators/abstract_operator.hpp index 576df428eb..64915a5325 100644 --- a/src/lib/operators/abstract_operator.hpp +++ b/src/lib/operators/abstract_operator.hpp @@ -22,6 +22,17 @@ class TransactionContext; // succeed if execute was called before. Otherwise, a nullptr or an empty table could be returned. // // Operators shall not be executed twice. +// +// In order to use new operators in server mode, the following steps have to be performed: +// 1. Add a new Operator definition in Protobuf file: `src/lib/network/protos/opossum.proto` and add it to the +// enumeration in `OperatorVariant` in this file +// 2. Run `premake4 protoc` to generate header- and cpp-files for protocol buffer operators +// 3. Add a method to class OperatorTranslator in `src/lib/network/operator_translator.cpp` and +// `src/lib/network/operator_translator.hpp` that transforms the protocol buffer objects into the corresponding +// opossum operator +// 4. Add an entry in the swith-case of OperatorTranslator::translate_proto() to dispatch calls to the method created +// in step 3 +// 5. Write a test in `src/test/network/operator_translator_test.cpp` class AbstractOperator { public: AbstractOperator(const std::shared_ptr left = nullptr, diff --git a/src/lib/scheduler/worker.cpp b/src/lib/scheduler/worker.cpp index 22db2fc926..c5e884cc81 100644 --- a/src/lib/scheduler/worker.cpp +++ b/src/lib/scheduler/worker.cpp @@ -21,7 +21,7 @@ namespace { * Uses a weak_ptr, because otherwise the ref-count of it would not reach zero within the main() scope of the program. */ thread_local std::weak_ptr this_thread_worker; -} +} // namespace namespace opossum { diff --git a/src/lib/storage/index/adaptive_radix_tree/adaptive_radix_tree_nodes.cpp b/src/lib/storage/index/adaptive_radix_tree/adaptive_radix_tree_nodes.cpp index e6bd149190..8576ea9a8c 100644 --- a/src/lib/storage/index/adaptive_radix_tree/adaptive_radix_tree_nodes.cpp +++ b/src/lib/storage/index/adaptive_radix_tree/adaptive_radix_tree_nodes.cpp @@ -165,16 +165,18 @@ BaseIndex::Iterator Node16::_delegate_to_child( BaseIndex::Iterator Node16::lower_bound(const AdaptiveRadixTreeIndex::BinaryComparable &key, size_t depth) const { return _delegate_to_child( - key, depth, [this](std::iterator_traits::iterator>::difference_type partial_key_pos, - AdaptiveRadixTreeIndex::BinaryComparable key, - size_t depth) { return _children[partial_key_pos]->lower_bound(key, depth); }); + key, depth, + [this](std::iterator_traits::iterator>::difference_type partial_key_pos, + AdaptiveRadixTreeIndex::BinaryComparable key, + size_t depth) { return _children[partial_key_pos]->lower_bound(key, depth); }); } BaseIndex::Iterator Node16::upper_bound(const AdaptiveRadixTreeIndex::BinaryComparable &key, size_t depth) const { return _delegate_to_child( - key, depth, [this](std::iterator_traits::iterator>::difference_type partial_key_pos, - AdaptiveRadixTreeIndex::BinaryComparable key, - size_t depth) { return _children[partial_key_pos]->upper_bound(key, depth); }); + key, depth, + [this](std::iterator_traits::iterator>::difference_type partial_key_pos, + AdaptiveRadixTreeIndex::BinaryComparable key, + size_t depth) { return _children[partial_key_pos]->upper_bound(key, depth); }); } BaseIndex::Iterator Node16::begin() const { return _children[0]->begin(); } diff --git a/src/lib/storage/index/group_key/variable_length_key_store.hpp b/src/lib/storage/index/group_key/variable_length_key_store.hpp index 5793c870d3..a185d59582 100644 --- a/src/lib/storage/index/group_key/variable_length_key_store.hpp +++ b/src/lib/storage/index/group_key/variable_length_key_store.hpp @@ -128,9 +128,7 @@ class VariableLengthKeyStore { template ::value>::type> IteratorBase(const IteratorBase &other) // NOLINT(runtime/explicit) - : _bytes_per_key(other._bytes_per_key), - _key_alignment(other._key_alignment), - _data(other._data) {} + : _bytes_per_key(other._bytes_per_key), _key_alignment(other._key_alignment), _data(other._data) {} private: /** diff --git a/src/test/base_test.cpp b/src/test/base_test.cpp index 7dfdb4206c..71d121aa6c 100644 --- a/src/test/base_test.cpp +++ b/src/test/base_test.cpp @@ -105,12 +105,12 @@ ::testing::AssertionResult BaseTest::_table_equal(const Table &tleft, const Tabl for (unsigned col = 0; col < left[row].size(); col++) { if (tleft.column_type(col) == "float") { EXPECT_EQ(tright.column_type(col), "float"); - EXPECT_NEAR(type_cast(left[row][col]), type_cast(right[row][col]), 0.0001) << "Row/Col:" << row - << "/" << col; + EXPECT_NEAR(type_cast(left[row][col]), type_cast(right[row][col]), 0.0001) + << "Row/Col:" << row << "/" << col; } else if (tleft.column_type(col) == "double") { EXPECT_EQ(tright.column_type(col), "double"); - EXPECT_NEAR(type_cast(left[row][col]), type_cast(right[row][col]), 0.0001) << "Row/Col:" << row - << "/" << col; + EXPECT_NEAR(type_cast(left[row][col]), type_cast(right[row][col]), 0.0001) + << "Row/Col:" << row << "/" << col; } else { EXPECT_EQ(left[row][col], right[row][col]) << "Row:" << row + 1 << " Col:" << col + 1; } diff --git a/src/test/network/operator_translator_test.cpp b/src/test/network/operator_translator_test.cpp new file mode 100644 index 0000000000..696f937d54 --- /dev/null +++ b/src/test/network/operator_translator_test.cpp @@ -0,0 +1,608 @@ +#include +#include +#include +#include +#include +#include +#include + +#include "../base_test.hpp" +#include "gtest/gtest.h" + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-parameter" +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" +#include "../../lib/network/generated/opossum.pb.h" +#pragma GCC diagnostic pop +#include "../../lib/network/operator_translator.hpp" +#include "../../lib/operators/abstract_operator.hpp" +#include "../../lib/operators/difference.hpp" +#include "../../lib/operators/export_binary.hpp" +#include "../../lib/operators/export_csv.hpp" +#include "../../lib/operators/get_table.hpp" +#include "../../lib/operators/import_csv.hpp" +#include "../../lib/operators/index_column_scan.hpp" +#include "../../lib/operators/join_nested_loop_a.hpp" +#include "../../lib/operators/print.hpp" +#include "../../lib/operators/product.hpp" +#include "../../lib/operators/projection.hpp" +#include "../../lib/operators/sort.hpp" +#include "../../lib/operators/table_scan.hpp" +#include "../../lib/operators/union_all.hpp" +#include "../../lib/storage/storage_manager.hpp" +#include "../../lib/storage/table.hpp" +#include "../../lib/types.hpp" + +namespace opossum { + +class OperatorTranslatorTest : public BaseTest { + protected: + void SetUp() override { + _test_table = load_table("src/test/tables/int_string2.tbl", 2); + StorageManager::get().add_table("TestTable", _test_table); + + std::shared_ptr test_table_int_float = load_table("src/test/tables/int_float.tbl", 5); + StorageManager::get().add_table("table_int_float", std::move(test_table_int_float)); + + std::shared_ptr
test_table_int_float_2 = load_table("src/test/tables/int_float2.tbl", 2); + StorageManager::get().add_table("table_int_float_2", std::move(test_table_int_float_2)); + + std::shared_ptr
test_table_int_float_3 = load_table("src/test/tables/int_float3.tbl", 2); + StorageManager::get().add_table("table_int_float_3", std::move(test_table_int_float_3)); + + std::shared_ptr
test_table_a = load_table("src/test/tables/int.tbl", 5); + StorageManager::get().add_table("table_a", std::move(test_table_a)); + + std::shared_ptr
test_table_b = load_table("src/test/tables/float.tbl", 2); + StorageManager::get().add_table("table_b", std::move(test_table_b)); + } + + std::shared_ptr
_test_table; +}; + +TEST_F(OperatorTranslatorTest, GetTable) { + auto msg = proto::OperatorVariant(); + proto::GetTableOperator* get_table_operator = msg.mutable_get_table(); + get_table_operator->set_table_name("TestTable"); + + OperatorTranslator translator; + auto& tasks = translator.build_tasks_from_proto(msg); + auto root_task = translator.root_task(); + ASSERT_EQ(tasks.size(), 1ul); + + auto task = tasks.at(0); + ASSERT_EQ(root_task, task); + const auto get_table = std::dynamic_pointer_cast(task->get_operator()); + ASSERT_TRUE(get_table); + get_table->execute(); + + EXPECT_EQ(get_table->get_output(), _test_table); +} + +TEST_F(OperatorTranslatorTest, ScanTableInt) { + std::shared_ptr
expected_result = load_table("src/test/tables/int_string_filtered.tbl", 2); + + auto msg = proto::OperatorVariant(); + proto::TableScanOperator* table_scan_operator = msg.mutable_table_scan(); + table_scan_operator->set_column_name("a"); + table_scan_operator->set_filter_operator("="); + proto::Variant* variant = table_scan_operator->mutable_value(); + variant->set_value_int(123); + proto::GetTableOperator* get_table_operator = table_scan_operator->mutable_input_operator()->mutable_get_table(); + get_table_operator->set_table_name("TestTable"); + + OperatorTranslator translator; + auto& tasks = translator.build_tasks_from_proto(msg); + auto root_task = translator.root_task(); + ASSERT_EQ(tasks.size(), 2ul); + + auto get_table_task = tasks.at(0); + std::shared_ptr get_table = std::dynamic_pointer_cast(get_table_task->get_operator()); + ASSERT_TRUE(get_table); + get_table->execute(); + + auto table_scan_task = tasks.at(1); + auto table_scan = std::dynamic_pointer_cast(table_scan_task->get_operator()); + ASSERT_TRUE(table_scan_task); + ASSERT_EQ(root_task, table_scan_task); + table_scan->execute(); + + EXPECT_TABLE_EQ(table_scan->get_output(), expected_result); +} + +TEST_F(OperatorTranslatorTest, ScanTableIntBetween) { + std::shared_ptr
expected_result = load_table("src/test/tables/int_string_filtered.tbl", 2); + + auto msg = proto::OperatorVariant(); + proto::TableScanOperator* table_scan_operator = msg.mutable_table_scan(); + table_scan_operator->set_column_name("a"); + table_scan_operator->set_filter_operator("BETWEEN"); + proto::Variant* variant = table_scan_operator->mutable_value(); + variant->set_value_int(122); + proto::Variant* variant2 = table_scan_operator->mutable_value2(); + variant2->set_value_int(124); + proto::GetTableOperator* get_table_operator = table_scan_operator->mutable_input_operator()->mutable_get_table(); + get_table_operator->set_table_name("TestTable"); + + OperatorTranslator translator; + auto& tasks = translator.build_tasks_from_proto(msg); + auto root_task = translator.root_task(); + ASSERT_EQ(tasks.size(), 2ul); + + auto get_table_task = tasks.at(0); + std::shared_ptr get_table = std::dynamic_pointer_cast(get_table_task->get_operator()); + ASSERT_TRUE(get_table); + get_table->execute(); + + auto table_scan_task = tasks.at(1); + auto table_scan = std::dynamic_pointer_cast(table_scan_task->get_operator()); + ASSERT_TRUE(table_scan_task); + ASSERT_EQ(root_task, table_scan_task); + table_scan->execute(); + + EXPECT_TABLE_EQ(table_scan->get_output(), expected_result); +} + +TEST_F(OperatorTranslatorTest, ScanTableString) { + std::shared_ptr
expected_result = load_table("src/test/tables/int_string_filtered.tbl", 2); + + auto msg = proto::OperatorVariant(); + proto::TableScanOperator* table_scan_operator = msg.mutable_table_scan(); + table_scan_operator->set_column_name("b"); + table_scan_operator->set_filter_operator("="); + proto::Variant* variant = table_scan_operator->mutable_value(); + variant->set_value_string("A"); + proto::GetTableOperator* get_table_operator = table_scan_operator->mutable_input_operator()->mutable_get_table(); + get_table_operator->set_table_name("TestTable"); + + OperatorTranslator translator; + auto& tasks = translator.build_tasks_from_proto(msg); + auto root_task = translator.root_task(); + ASSERT_EQ(tasks.size(), 2ul); + + auto get_table_task = tasks.at(0); + std::shared_ptr get_table = std::dynamic_pointer_cast(get_table_task->get_operator()); + ASSERT_TRUE(get_table); + get_table->execute(); + + auto table_scan_task = tasks.at(1); + auto table_scan = std::dynamic_pointer_cast(table_scan_task->get_operator()); + ASSERT_TRUE(table_scan_task); + ASSERT_EQ(root_task, table_scan_task); + table_scan->execute(); + + EXPECT_TABLE_EQ(table_scan->get_output(), expected_result); +} + +TEST_F(OperatorTranslatorTest, Projection) { + std::shared_ptr
expected_result = load_table("src/test/tables/int.tbl", 1); + + auto msg = proto::OperatorVariant(); + proto::ProjectionOperator* projection_operator = msg.mutable_projection(); + projection_operator->add_column_name("a"); + proto::GetTableOperator* get_table_operator = projection_operator->mutable_input_operator()->mutable_get_table(); + get_table_operator->set_table_name("TestTable"); + + OperatorTranslator translator; + auto& tasks = translator.build_tasks_from_proto(msg); + auto root_task = translator.root_task(); + ASSERT_EQ(tasks.size(), 2ul); + + auto get_table_task = tasks.at(0); + std::shared_ptr get_table = std::dynamic_pointer_cast(get_table_task->get_operator()); + ASSERT_TRUE(get_table); + get_table->execute(); + + auto projection_task = tasks.at(1); + auto projection = std::dynamic_pointer_cast(projection_task->get_operator()); + ASSERT_TRUE(projection); + ASSERT_EQ(root_task, projection_task); + projection->execute(); + + EXPECT_TABLE_EQ(projection->get_output(), expected_result); +} + +TEST_F(OperatorTranslatorTest, Product) { + std::shared_ptr
expected_result = load_table("src/test/tables/int_float_product.tbl", 3); + + auto msg = proto::OperatorVariant(); + proto::ProductOperator* product_operation = msg.mutable_product(); + product_operation->mutable_left_operator()->mutable_get_table()->set_table_name("table_a"); + product_operation->set_prefix_left("left."); + product_operation->mutable_right_operator()->mutable_get_table()->set_table_name("table_b"); + product_operation->set_prefix_right("right."); + + OperatorTranslator translator; + auto& tasks = translator.build_tasks_from_proto(msg); + auto root_task = translator.root_task(); + ASSERT_EQ(tasks.size(), 3ul); + + auto get_table_task_a = tasks.at(0); + auto get_table_a = std::dynamic_pointer_cast(get_table_task_a->get_operator()); + ASSERT_TRUE(get_table_a); + get_table_a->execute(); + + auto get_table_task_b = tasks.at(1); + auto get_table_b = std::dynamic_pointer_cast(get_table_task_b->get_operator()); + ASSERT_TRUE(get_table_b); + get_table_b->execute(); + + auto product_task = tasks.at(2); + auto product = std::dynamic_pointer_cast(product_task->get_operator()); + ASSERT_TRUE(product); + ASSERT_EQ(root_task, product_task); + product->execute(); + + EXPECT_TABLE_EQ(product->get_output(), expected_result); +} + +TEST_F(OperatorTranslatorTest, AscendingSort) { + std::shared_ptr
expected_result = load_table("src/test/tables/int_float_sorted.tbl", 2); + + auto msg = proto::OperatorVariant(); + proto::SortOperator* sort_operation = msg.mutable_sort(); + sort_operation->mutable_input_operator()->mutable_get_table()->set_table_name("table_int_float"); + sort_operation->set_column_name("a"); + sort_operation->set_ascending(true); + + OperatorTranslator translator; + auto& tasks = translator.build_tasks_from_proto(msg); + auto root_task = translator.root_task(); + ASSERT_EQ(tasks.size(), 2ul); + + auto get_table_task = tasks.at(0); + auto get_table = std::dynamic_pointer_cast(get_table_task->get_operator()); + ASSERT_TRUE(get_table); + get_table->execute(); + + auto sort_task = tasks.at(1); + auto sort = std::dynamic_pointer_cast(sort_task->get_operator()); + ASSERT_TRUE(sort); + ASSERT_EQ(root_task, sort_task); + sort->execute(); + + EXPECT_TABLE_EQ(sort->get_output(), expected_result, true); +} + +TEST_F(OperatorTranslatorTest, DescendingSort) { + std::shared_ptr
expected_result = load_table("src/test/tables/int_float_reverse.tbl", 2); + + auto msg = proto::OperatorVariant(); + proto::SortOperator* sort_operation = msg.mutable_sort(); + sort_operation->mutable_input_operator()->mutable_get_table()->set_table_name("table_int_float"); + sort_operation->set_column_name("a"); + sort_operation->set_ascending(false); + + OperatorTranslator translator; + auto& tasks = translator.build_tasks_from_proto(msg); + auto root_task = translator.root_task(); + ASSERT_EQ(tasks.size(), 2ul); + + auto get_table_task = tasks.at(0); + auto get_table = std::dynamic_pointer_cast(get_table_task->get_operator()); + ASSERT_TRUE(get_table); + get_table->execute(); + + auto sort_task = tasks.at(1); + auto sort = std::dynamic_pointer_cast(sort_task->get_operator()); + ASSERT_TRUE(sort); + ASSERT_EQ(root_task, sort_task); + sort->execute(); + + EXPECT_TABLE_EQ(sort->get_output(), expected_result, true); +} + +TEST_F(OperatorTranslatorTest, UnionOfTables) { + std::shared_ptr
expected_result = load_table("src/test/tables/int_float_union.tbl", 2); + + auto msg = proto::OperatorVariant(); + proto::UnionAllOperator* union_all_operation = msg.mutable_union_all(); + union_all_operation->mutable_input_operator1()->mutable_get_table()->set_table_name("table_int_float"); + union_all_operation->mutable_input_operator2()->mutable_get_table()->set_table_name("table_int_float_2"); + + OperatorTranslator translator; + auto& tasks = translator.build_tasks_from_proto(msg); + auto root_task = translator.root_task(); + ASSERT_EQ(tasks.size(), 3ul); + + auto get_table_task_1 = tasks.at(0); + auto get_table_1 = std::dynamic_pointer_cast(get_table_task_1->get_operator()); + ASSERT_TRUE(get_table_1); + get_table_1->execute(); + + auto get_table_task_2 = tasks.at(1); + auto get_table_2 = std::dynamic_pointer_cast(get_table_task_2->get_operator()); + ASSERT_TRUE(get_table_2); + get_table_2->execute(); + + auto union_all_task = tasks.at(2); + auto union_all = std::dynamic_pointer_cast(union_all_task->get_operator()); + ASSERT_TRUE(union_all); + ASSERT_EQ(root_task, union_all_task); + union_all->execute(); + + EXPECT_TABLE_EQ(union_all->get_output(), expected_result); +} + +TEST_F(OperatorTranslatorTest, TableScanAndProjection) { + std::shared_ptr
expected_result = load_table("src/test/tables/int_string_filtered_projected.tbl", 2); + + auto msg = proto::OperatorVariant(); + proto::TableScanOperator* table_scan_operator = msg.mutable_table_scan(); + table_scan_operator->set_column_name("a"); + table_scan_operator->set_filter_operator("="); + proto::Variant* variant = table_scan_operator->mutable_value(); + variant->set_value_int(123); + proto::ProjectionOperator* projection_operator = table_scan_operator->mutable_input_operator()->mutable_projection(); + projection_operator->add_column_name("a"); + proto::GetTableOperator* get_table_operator = projection_operator->mutable_input_operator()->mutable_get_table(); + get_table_operator->set_table_name("TestTable"); + + OperatorTranslator translator; + auto& tasks = translator.build_tasks_from_proto(msg); + auto root_task = translator.root_task(); + ASSERT_EQ(tasks.size(), 3ul); + + auto get_table_task = tasks.at(0); + std::shared_ptr get_table = std::dynamic_pointer_cast(get_table_task->get_operator()); + ASSERT_TRUE(get_table); + get_table->execute(); + + auto projection_task = tasks.at(1); + auto projection = std::dynamic_pointer_cast(projection_task->get_operator()); + ASSERT_TRUE(projection); + projection->execute(); + + auto table_scan_task = tasks.at(2); + auto table_scan = std::dynamic_pointer_cast(table_scan_task->get_operator()); + ASSERT_TRUE(table_scan_task); + ASSERT_EQ(root_task, table_scan_task); + table_scan->execute(); + + EXPECT_TABLE_EQ(table_scan->get_output(), expected_result); +} + +TEST_F(OperatorTranslatorTest, ImportCsv) { + auto msg = proto::OperatorVariant(); + proto::ImportCsvOperator* import_csv_operator = msg.mutable_import_csv(); + import_csv_operator->set_directory("src/test/csv"); + import_csv_operator->set_filename("float"); + + OperatorTranslator translator; + auto& tasks = translator.build_tasks_from_proto(msg); + auto root_task = translator.root_task(); + ASSERT_EQ(tasks.size(), 1ul); + + auto task = tasks.at(0); + ASSERT_EQ(root_task, task); + const auto import_csv = std::dynamic_pointer_cast(task->get_operator()); + ASSERT_TRUE(import_csv); + import_csv->execute(); + + auto expected_table = load_table("src/test/tables/float.tbl", 5); + EXPECT_TABLE_EQ(import_csv->get_output(), expected_table); +} + +TEST_F(OperatorTranslatorTest, ExportCsv) { + auto msg = proto::OperatorVariant(); + proto::ExportCsvOperator* export_operator = msg.mutable_export_csv(); + export_operator->set_directory("src/test/csv"); + export_operator->set_filename("tmp_float"); + export_operator->mutable_input_operator()->mutable_get_table()->set_table_name("table_int_float"); + + OperatorTranslator translator; + auto& tasks = translator.build_tasks_from_proto(msg); + auto root_task = translator.root_task(); + ASSERT_EQ(tasks.size(), 2ul); + + auto get_table_task = tasks.at(0); + auto get_table = std::dynamic_pointer_cast(get_table_task->get_operator()); + ASSERT_TRUE(get_table); + + auto task = tasks.at(1); + ASSERT_EQ(root_task, task); + const auto export_csv = std::dynamic_pointer_cast(task->get_operator()); + ASSERT_TRUE(export_csv); +} + +TEST_F(OperatorTranslatorTest, ExportBinary) { + auto msg = proto::OperatorVariant(); + proto::ExportBinaryOperator* export_operator = msg.mutable_export_binary(); + export_operator->set_filename("tmp_float"); + export_operator->mutable_input_operator()->mutable_get_table()->set_table_name("table_int_float"); + + OperatorTranslator translator; + auto& tasks = translator.build_tasks_from_proto(msg); + auto root_task = translator.root_task(); + ASSERT_EQ(tasks.size(), 2ul); + + auto get_table_task = tasks.at(0); + auto get_table = std::dynamic_pointer_cast(get_table_task->get_operator()); + ASSERT_TRUE(get_table); + + auto task = tasks.at(1); + ASSERT_EQ(root_task, task); + const auto export_csv = std::dynamic_pointer_cast(task->get_operator()); + ASSERT_TRUE(export_csv); +} + +TEST_F(OperatorTranslatorTest, Print) { + auto msg = proto::OperatorVariant(); + auto print_operator = msg.mutable_print(); + print_operator->mutable_input_operator()->mutable_get_table()->set_table_name("table_int_float"); + + OperatorTranslator translator; + auto& tasks = translator.build_tasks_from_proto(msg); + auto root_task = translator.root_task(); + ASSERT_EQ(tasks.size(), 2ul); + + auto get_table_task = tasks.at(0); + auto get_table = std::dynamic_pointer_cast(get_table_task->get_operator()); + ASSERT_TRUE(get_table); + get_table->execute(); + + auto task = tasks.at(1); + ASSERT_EQ(root_task, task); + const auto print = std::dynamic_pointer_cast(task->get_operator()); + ASSERT_TRUE(print); + // suppress printing + std::ostringstream local; + auto cout_buf = std::cout.rdbuf(); + std::cout.rdbuf(local.rdbuf()); + print->execute(); + std::cout.rdbuf(cout_buf); + + auto expected_table = load_table("src/test/tables/int_float.tbl", 5); + EXPECT_TABLE_EQ(print->get_output(), expected_table); +} + +TEST_F(OperatorTranslatorTest, Difference) { + std::shared_ptr
expected_result = load_table("src/test/tables/int_float_filtered2.tbl", 2); + + auto msg = proto::OperatorVariant(); + auto difference_operator = msg.mutable_difference(); + difference_operator->mutable_left_operator()->mutable_get_table()->set_table_name("table_int_float"); + difference_operator->mutable_right_operator()->mutable_get_table()->set_table_name("table_int_float_3"); + + OperatorTranslator translator; + auto& tasks = translator.build_tasks_from_proto(msg); + auto root_task = translator.root_task(); + ASSERT_EQ(tasks.size(), 3ul); + + auto get_left_table_task = tasks.at(0); + auto get_left_table = std::dynamic_pointer_cast(get_left_table_task->get_operator()); + ASSERT_TRUE(get_left_table); + get_left_table->execute(); + + auto get_right_table_task = tasks.at(1); + auto get_right_table = std::dynamic_pointer_cast(get_right_table_task->get_operator()); + ASSERT_TRUE(get_right_table); + get_right_table->execute(); + + auto difference_task = tasks.at(2); + auto difference = std::dynamic_pointer_cast(difference_task->get_operator()); + ASSERT_TRUE(difference_task); + ASSERT_EQ(root_task, difference_task); + difference->execute(); + + EXPECT_TABLE_EQ(difference->get_output(), expected_result); +} + +TEST_F(OperatorTranslatorTest, IndexColumnScanInt) { + std::shared_ptr
expected_result = load_table("src/test/tables/int_string_filtered.tbl", 2); + + auto msg = proto::OperatorVariant(); + proto::IndexColumnScanOperator* index_column_scan_operator = msg.mutable_index_column_scan(); + index_column_scan_operator->set_column_name("a"); + index_column_scan_operator->set_filter_operator("="); + proto::Variant* variant = index_column_scan_operator->mutable_value(); + variant->set_value_int(123); + proto::GetTableOperator* get_table_operator = + index_column_scan_operator->mutable_input_operator()->mutable_get_table(); + get_table_operator->set_table_name("TestTable"); + + OperatorTranslator translator; + auto& tasks = translator.build_tasks_from_proto(msg); + auto root_task = translator.root_task(); + ASSERT_EQ(tasks.size(), 2ul); + + auto get_table_task = tasks.at(0); + std::shared_ptr get_table = std::dynamic_pointer_cast(get_table_task->get_operator()); + ASSERT_TRUE(get_table); + get_table->execute(); + + auto index_column_scan_task = tasks.at(1); + auto index_column_scan = std::dynamic_pointer_cast(index_column_scan_task->get_operator()); + ASSERT_TRUE(index_column_scan_task); + ASSERT_EQ(root_task, index_column_scan_task); + index_column_scan->execute(); + + EXPECT_TABLE_EQ(index_column_scan->get_output(), expected_result); +} + +TEST_F(OperatorTranslatorTest, DISABLED_NestedLoopJoinModes) { + auto msg = proto::OperatorVariant(); + proto::NestedLoopJoinOperator* join_operation = msg.mutable_nested_loop_join(); + join_operation->mutable_left_operator()->mutable_get_table()->set_table_name("table_int_float"); + join_operation->set_prefix_left("left"); + join_operation->mutable_right_operator()->mutable_get_table()->set_table_name("table_int_float_2"); + join_operation->set_prefix_right("right"); + join_operation->set_op("="); + join_operation->set_left_column_name("a"); + join_operation->set_right_column_name("a"); + + auto modes = {proto::NestedLoopJoinOperator::Inner, proto::NestedLoopJoinOperator::Left, + proto::NestedLoopJoinOperator::Right, proto::NestedLoopJoinOperator::Outer, + proto::NestedLoopJoinOperator::Cross, proto::NestedLoopJoinOperator::Natural, + proto::NestedLoopJoinOperator::Self}; + + for (auto mode : modes) { + join_operation->set_mode(mode); + + OperatorTranslator translator; + auto& tasks = translator.build_tasks_from_proto(msg); + ASSERT_EQ(tasks.size(), 3ul); + auto join_task = tasks.at(2); + auto join = std::dynamic_pointer_cast(join_task->get_operator()); + ASSERT_TRUE(join_task); + } +} + +TEST_F(OperatorTranslatorTest, DISABLED_NestedLoopJoinWithColumns) { + std::shared_ptr
expected_result = load_table("src/test/tables/int_left_join.tbl", 1); + + auto msg = proto::OperatorVariant(); + proto::NestedLoopJoinOperator* join_operation = msg.mutable_nested_loop_join(); + join_operation->mutable_left_operator()->mutable_get_table()->set_table_name("table_int_float"); + join_operation->set_prefix_left("left"); + join_operation->mutable_right_operator()->mutable_get_table()->set_table_name("table_int_float_2"); + join_operation->set_prefix_right("right"); + join_operation->set_mode(proto::NestedLoopJoinOperator::Left); + join_operation->set_op("="); + join_operation->set_left_column_name("a"); + join_operation->set_right_column_name("a"); + + OperatorTranslator translator; + auto& tasks = translator.build_tasks_from_proto(msg); + auto root_task = translator.root_task(); + ASSERT_EQ(tasks.size(), 3ul); + + auto get_left_table_task = tasks.at(0); + std::shared_ptr get_left_table = std::dynamic_pointer_cast(get_left_table_task->get_operator()); + ASSERT_TRUE(get_left_table); + get_left_table->execute(); + + auto get_right_table_task = tasks.at(1); + std::shared_ptr get_right_table = std::dynamic_pointer_cast(get_right_table_task->get_operator()); + ASSERT_TRUE(get_right_table); + get_right_table->execute(); + + auto join_task = tasks.at(2); + auto join = std::dynamic_pointer_cast(join_task->get_operator()); + ASSERT_TRUE(join_task); + ASSERT_EQ(root_task, join_task); + join->execute(); + + EXPECT_TABLE_EQ(join->get_output(), expected_result); +} + +TEST_F(OperatorTranslatorTest, ProjectionMissingInput) { + auto msg = proto::OperatorVariant(); + proto::ProjectionOperator* projection_operator = msg.mutable_projection(); + projection_operator->add_column_name("a"); + + OperatorTranslator translator; + EXPECT_THROW(translator.build_tasks_from_proto(msg), std::runtime_error); +} + +TEST_F(OperatorTranslatorTest, ProjectionIncompleteInput) { + auto msg = proto::OperatorVariant(); + proto::ProjectionOperator* projection_operator = msg.mutable_projection(); + projection_operator->add_column_name("a"); + projection_operator->mutable_input_operator(); + + OperatorTranslator translator; + EXPECT_THROW(translator.build_tasks_from_proto(msg), std::runtime_error); +} + +} // namespace opossum diff --git a/src/test/network/response_builder_test.cpp b/src/test/network/response_builder_test.cpp new file mode 100644 index 0000000000..3df1dfa9ed --- /dev/null +++ b/src/test/network/response_builder_test.cpp @@ -0,0 +1,92 @@ +#include +#include +#include +#include +#include +#include +#include + +#include "../base_test.hpp" +#include "gtest/gtest.h" + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-parameter" +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" +#include "../../lib/network/generated/opossum.grpc.pb.h" +#pragma GCC diagnostic pop +#include "../../lib/network/response_builder.hpp" +#include "../../lib/operators/abstract_operator.hpp" +#include "../../lib/operators/get_table.hpp" +#include "../../lib/operators/print.hpp" +#include "../../lib/operators/table_scan.hpp" +#include "../../lib/storage/storage_manager.hpp" +#include "../../lib/storage/table.hpp" +#include "../../lib/types.hpp" + +namespace { +std::string load_response(const std::string &file_name) { + std::ifstream infile(file_name); + std::string line; + std::stringstream response_text; + + while (std::getline(infile, line)) { + response_text << line << std::endl; + } + + return response_text.str(); +} +} // namespace + +namespace opossum { + +class ResponseBuilderTest : public BaseTest { + protected: + void SetUp() override { + auto test_table = load_table("src/test/tables/int_float.tbl", 2); + StorageManager::get().add_table("table_a", std::move(test_table)); + _gt = std::make_shared("table_a"); + _gt->execute(); + + auto test_table_dict = load_table("src/test/tables/int_float.tbl", 2); + test_table_dict->compress_chunk(0); + test_table_dict->compress_chunk(1); + StorageManager::get().add_table("table_dict", std::move(test_table_dict)); + + _gt_dict = std::make_shared("table_dict"); + _gt_dict->execute(); + } + + std::shared_ptr _gt, _gt_dict; + ResponseBuilder _builder; +}; + +TEST_F(ResponseBuilderTest, BuildResponseValueColumn) { + proto::Response response; + auto expected_result = load_response("src/test/responses/int_float.tbl.rsp"); + + _builder.build_response(response, _gt->get_output()); + + EXPECT_EQ(response.DebugString(), expected_result); +} + +TEST_F(ResponseBuilderTest, BuildResponseDictColumn) { + proto::Response response; + auto expected_result = load_response("src/test/responses/int_float.tbl.rsp"); + + _builder.build_response(response, _gt_dict->get_output()); + + EXPECT_EQ(response.DebugString(), expected_result); +} + +TEST_F(ResponseBuilderTest, BuildResponseRefColumn) { + proto::Response response; + auto expected_result = load_response("src/test/responses/int_float_filtered_a_1234.tbl.rsp"); + + auto scan_1 = std::make_shared(_gt, "a", "=", 1234); + scan_1->execute(); + _builder.build_response(response, scan_1->get_output()); + + EXPECT_EQ(response.DebugString(), expected_result); +} + +} // namespace opossum diff --git a/src/test/network/server_test.cpp b/src/test/network/server_test.cpp new file mode 100644 index 0000000000..dec2f0f593 --- /dev/null +++ b/src/test/network/server_test.cpp @@ -0,0 +1,180 @@ +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-parameter" +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" +#include +#pragma GCC diagnostic pop +#include +#include +#include +#include +#include +#include + +#include "../base_test.hpp" +#include "gtest/gtest.h" + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-parameter" +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" +#include "network/generated/opossum.grpc.pb.h" +#pragma GCC diagnostic pop +#include "network/server.hpp" +#include "storage/storage_manager.hpp" +#include "storage/table.hpp" + +using grpc::Channel; +using grpc::ClientAsyncResponseReader; +using grpc::ClientContext; +using grpc::CompletionQueue; +using grpc::Status; + +namespace { +std::string load_response(const std::string& file_name) { + std::ifstream infile(file_name); + std::string line; + std::stringstream response_text; + + while (std::getline(infile, line)) { + response_text << line << std::endl; + } + + return response_text.str(); +} +} // namespace + +namespace opossum { + +class ServerTest : public BaseTest { + protected: + std::shared_ptr doAsyncRequest(proto::Request& request) { + std::unique_ptr _stub = + proto::OpossumService::NewStub(grpc::CreateChannel(address, grpc::InsecureChannelCredentials())); + + // Container for the data we expect from the server. + auto response = std::make_shared(); + + // Context for the client. It could be used to convey extra information to + // the server and/or tweak certain RPC behaviors. + ClientContext context; + + // The producer-consumer queue we use to communicate asynchronously with the + // gRPC runtime. + CompletionQueue cq; + + // Storage for the status of the RPC upon completion. + Status status; + + // _stub->AsyncQuery() performs the RPC call, returning an instance we + // store in "rpc". Because we are using the asynchronous API, we need to + // hold on to the "rpc" instance in order to get updates on the ongoing RPC. + std::unique_ptr> rpc(_stub->AsyncQuery(&context, request, &cq)); + + // Request that, upon completion of the RPC, "response" be updated with the + // server's response; "status" with the indication of whether the op_variant + // was successful. Tag the request with the integer 1. + rpc->Finish(response.get(), &status, reinterpret_cast(1)); + void* got_tag; + bool ok = false; + // Block until the next result is available in the completion queue "cq". + // The return value of Next should always be checked. This return value + // tells us whether there is any kind of event or the _cq is shutting down. + EXPECT_TRUE(cq.Next(&got_tag, &ok)); + + // Verify that the result from "cq" corresponds, by its tag, our previous + // request. + EXPECT_TRUE(got_tag == reinterpret_cast(1)); + // ... and that the request was completed successfully. Note that "ok" + // corresponds solely to the request for updates introduced by Finish(). + EXPECT_TRUE(ok); + + return response; + } + + std::shared_ptr doRequest(proto::Request& request) { + std::unique_ptr _stub = + proto::OpossumService::NewStub(grpc::CreateChannel(address, grpc::InsecureChannelCredentials())); + + // Container for the data we expect from the server. + auto response = std::make_shared(); + + // Context for the client. It could be used to convey extra information to + // the server and/or tweak certain RPC behaviors. + ClientContext context; + + // The actual RPC. + Status status = _stub->Query(&context, request, response.get()); + + EXPECT_TRUE(status.ok()); + + return response; + } + + void SetUp() override { + auto test_table = load_table("src/test/tables/float.tbl", 2); + StorageManager::get().add_table(table_name, std::move(test_table)); + + config.address = address; + config.num_listener_threads = 1; + config.skip_scheduler = false; + config.topology = opossum::Topology::create_fake_numa_topology(8, 4); + } + + void TearDown() override { server.stop(); } + + std::string address{"0.0.0.0:50052"}; + std::string table_name{"table1"}; + Server server; + ServerConfiguration config; +}; + +TEST_F(ServerTest, SendQuerySkipScheduler) { + config.skip_scheduler = true; + server.start(config, false); + + proto::Request request; + + auto response = doRequest(request); + + EXPECT_EQ(response->DebugString(), ""); +} + +TEST_F(ServerTest, SendNoop) { + server.start(config, false); + + proto::Request request; + auto response = doRequest(request); + + EXPECT_EQ(response->DebugString(), ""); +} + +TEST_F(ServerTest, SendAsyncQuery) { + server.start(config, false); + + proto::Request request; + auto root_op_variant = request.mutable_root_operator(); + auto projection = root_op_variant->mutable_projection(); + projection->add_column_name("a"); + auto get_table = projection->mutable_input_operator()->mutable_get_table(); + get_table->set_table_name(table_name); + + auto response = doAsyncRequest(request); + + auto expected_result = load_response("src/test/responses/float.tbl.rsp"); + + EXPECT_EQ(response->DebugString(), expected_result); +} + +TEST_F(ServerTest, SendInvalidQuery) { + server.start(config, false); + + proto::Request request; + request.mutable_root_operator(); + + auto response = doRequest(request); + + EXPECT_FALSE(response->has_response_table()); + EXPECT_EQ(response->error(), + "Operator not set. Missing dependency. Cannot translate proto object to opossum operator."); +} + +} // namespace opossum diff --git a/src/test/responses/float.tbl.rsp b/src/test/responses/float.tbl.rsp new file mode 100644 index 0000000000..760b8f765a --- /dev/null +++ b/src/test/responses/float.tbl.rsp @@ -0,0 +1,24 @@ +response_table { + column_type: "float" + column_name: "a" + row { + value { + value_float: 1.1 + } + } + row { + value { + value_float: 2.2 + } + } + row { + value { + value_float: 3.3 + } + } + row { + value { + value_float: 4.4 + } + } +} diff --git a/src/test/responses/int_float.tbl.rsp b/src/test/responses/int_float.tbl.rsp new file mode 100644 index 0000000000..8137287832 --- /dev/null +++ b/src/test/responses/int_float.tbl.rsp @@ -0,0 +1,30 @@ +response_table { + column_type: "int" + column_type: "float" + column_name: "a" + column_name: "b" + row { + value { + value_int: 12345 + } + value { + value_float: 458.7 + } + } + row { + value { + value_int: 123 + } + value { + value_float: 456.7 + } + } + row { + value { + value_int: 1234 + } + value { + value_float: 457.7 + } + } +} diff --git a/src/test/responses/int_float_filtered_a_1234.tbl.rsp b/src/test/responses/int_float_filtered_a_1234.tbl.rsp new file mode 100644 index 0000000000..2fc16fb7e9 --- /dev/null +++ b/src/test/responses/int_float_filtered_a_1234.tbl.rsp @@ -0,0 +1,14 @@ +response_table { + column_type: "int" + column_type: "float" + column_name: "a" + column_name: "b" + row { + value { + value_int: 1234 + } + value { + value_float: 457.7 + } + } +} diff --git a/src/test/tables/int_string.tbl b/src/test/tables/int_string.tbl index d8aae08bc1..fdc7a69d63 100644 --- a/src/test/tables/int_string.tbl +++ b/src/test/tables/int_string.tbl @@ -11,4 +11,4 @@ int|string 18|test18 19|test19 20|test20 -21|test21 +21|test21 \ No newline at end of file diff --git a/src/test/tables/int_string2.tbl b/src/test/tables/int_string2.tbl new file mode 100644 index 0000000000..6dafbdd983 --- /dev/null +++ b/src/test/tables/int_string2.tbl @@ -0,0 +1,5 @@ +a|b +int|string +123|A +1234|B +12345|C \ No newline at end of file diff --git a/src/test/tables/int_string_filtered.tbl b/src/test/tables/int_string_filtered.tbl new file mode 100644 index 0000000000..e6d970a7de --- /dev/null +++ b/src/test/tables/int_string_filtered.tbl @@ -0,0 +1,3 @@ +a|b +int|string +123|A \ No newline at end of file diff --git a/src/test/tables/int_string_filtered_projected.tbl b/src/test/tables/int_string_filtered_projected.tbl new file mode 100644 index 0000000000..0db28c0e22 --- /dev/null +++ b/src/test/tables/int_string_filtered_projected.tbl @@ -0,0 +1,3 @@ +a +int +123 \ No newline at end of file diff --git a/third_party/grpc b/third_party/grpc new file mode 160000 index 0000000000..6040b471bc --- /dev/null +++ b/third_party/grpc @@ -0,0 +1 @@ +Subproject commit 6040b471bcd1d6bb05b25c126b6545180a1d3528