From 220744e928b4b6a2f78ca08b16c6389cc620f2d4 Mon Sep 17 00:00:00 2001 From: lihangyu <15605149486@163.com> Date: Thu, 24 Oct 2024 20:13:57 +0800 Subject: [PATCH 01/10] [Opt](config) adjust segment cache (#42308) For wide tables the default segment cache capacity is relatively small and lead to many cache eviction in high compaction/query load --- be/src/common/config.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index ca339ae574e2c5..31170b731f4e75 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1072,8 +1072,8 @@ DEFINE_mInt32(schema_cache_sweep_time_sec, "100"); // max number of segment cache, default -1 for backward compatibility fd_number*2/5 DEFINE_Int32(segment_cache_capacity, "-1"); DEFINE_Int32(segment_cache_fd_percentage, "20"); -DEFINE_mInt32(estimated_mem_per_column_reader, "1024"); -DEFINE_Int32(segment_cache_memory_percentage, "2"); +DEFINE_mInt32(estimated_mem_per_column_reader, "512"); +DEFINE_Int32(segment_cache_memory_percentage, "5"); // enable feature binlog, default false DEFINE_Bool(enable_feature_binlog, "false"); From bd9d58d42e389ac8bcd422fab424911f890f3b94 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Thu, 24 Oct 2024 20:46:15 +0800 Subject: [PATCH 02/10] [Feat](GA)automatically cherry-pick (#42435) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Proposed changes This PR helps us automatically cherry-pick commits from one PR to other branches while preserving the original commit information. The specific rules are as follows:↳ If a PR submitted to the main branch is merged and the label includes ‘dev/3.0.x’, it will automatically create a new branch, cherry-pick the commit, and then create a PR. If conflicts arise, the original PR will be tagged with a label. ## Test https://github.com/CalvinKirs/incubator-doris/pull/42 https://github.com/CalvinKirs/incubator-doris/pull/40 https://github.com/CalvinKirs/incubator-doris/pull/40 https://github.com/CalvinKirs/incubator-doris/actions/workflows/auto-pick.yml ## todo --- .github/workflows/auto-cherry-pick.yml | 55 +++++-------- tools/tools/auto-pick-script.py | 105 +++++++++++++++++++++++++ 2 files changed, 126 insertions(+), 34 deletions(-) create mode 100644 tools/tools/auto-pick-script.py diff --git a/.github/workflows/auto-cherry-pick.yml b/.github/workflows/auto-cherry-pick.yml index 3683295a3fc195..1d59b52b6c9c56 100644 --- a/.github/workflows/auto-cherry-pick.yml +++ b/.github/workflows/auto-cherry-pick.yml @@ -15,49 +15,36 @@ # specific language governing permissions and limitations # under the License. # +name: Auto Cherry-Pick to Branch on: pull_request: + types: + - closed branches: - master - types: ["closed"] jobs: - cherry_pick_branch_2.1: + auto_cherry_pick: runs-on: ubuntu-latest - name: Cherry pick into branch-2.1 - if: ${{ contains(github.event.pull_request.labels.*.name, 'dev/2.1.x') && github.event.pull_request.merged == true }} - steps: - - name: Checkout - uses: actions/checkout@v3 - with: - fetch-depth: 0 - - name: Cherry pick into branch-2.1 - uses: carloscastrojumo/github-cherry-pick-action@v1.0.1 - with: - branch: branch-2.1 - labels: | - cherry-pick - reviewers: | - yiguolei - cherry_pick_branch-3.0: - runs-on: ubuntu-latest - name: Cherry pick into branch-3.0 if: ${{ contains(github.event.pull_request.labels.*.name, 'dev/3.0.x') && github.event.pull_request.merged == true }} steps: - - name: Checkout + - name: Checkout repository uses: actions/checkout@v3 + + - name: Set up Python + uses: actions/setup-python@v4 with: - fetch-depth: 0 - - name: Cherry pick into branch-3.0 - uses: carloscastrojumo/github-cherry-pick-action@v1.0.1 - with: - branch: branch-3.0 - labels: | - cherry-pick - reviewers: | - dataroaring - title: '[cherry-pick] {old_title}' - body: 'Cherry picking #{old_pull_request_id} onto this branch' -env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file + python-version: '3.x' + + - name: Install dependencies + run: | + pip install PyGithub + + - name: Auto cherry-pick + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + REPO_NAME: ${{ github.repository }} + CONFLICT_LABEL: cherry-pick-conflict-in-3.0 + run: | + python tools/auto-pick-script.py ${{ github.event.pull_request.number }} branch-3.0 diff --git a/tools/tools/auto-pick-script.py b/tools/tools/auto-pick-script.py new file mode 100644 index 00000000000000..2431c76c6235d4 --- /dev/null +++ b/tools/tools/auto-pick-script.py @@ -0,0 +1,105 @@ +#!/usr/bin/env python3 +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +import sys +from github import Github +import subprocess + +# Get GitHub personal access token and other parameters +GITHUB_TOKEN = os.getenv('GITHUB_TOKEN') +REPO_NAME = os.getenv('REPO_NAME', 'apache/doris') # Default repository name +CONFLICT_LABEL = os.getenv('CONFLICT_LABEL', 'cherry-pick-conflict') # Conflict label from environment variable + +# Check if the required command-line arguments are provided +if len(sys.argv) != 3: + print("Usage: python script.py ") + sys.exit(1) + +pr_number = int(sys.argv[1]) # PR number from command-line argument +TARGET_BRANCH = sys.argv[2] # Target branch from command-line argument + +# Create GitHub instance +g = Github(GITHUB_TOKEN) +repo = g.get_repo(REPO_NAME) + +# Get the specified PR +pr = repo.get_pull(pr_number) + +# Check if the PR has been merged +if not pr.merged: + print(f"PR #{pr_number} has not been merged yet.") + exit(1) + +merge_commit_sha = pr.merge_commit_sha + +# Get the latest commit from the target branch +base_branch = repo.get_branch(TARGET_BRANCH) + +# Create a new branch for cherry-picking the PR +new_branch_name = f'auto-pick-{pr.number}-{TARGET_BRANCH}' +repo.create_git_ref(ref=f'refs/heads/{new_branch_name}', sha=base_branch.commit.sha) +print(f"Created new branch {new_branch_name} from {TARGET_BRANCH}.") +subprocess.run(["git", "config", "--global", "credential.helper", "store"], check=True) + +# Clone the repository locally and switch to the new branch +repo_url = f"https://x-access-token:{GITHUB_TOKEN}@github.com/{REPO_NAME}.git" +subprocess.run(["git", "clone", repo_url]) +repo_dir = REPO_NAME.split("/")[-1] # Get the directory name +subprocess.run(["git", "checkout", new_branch_name], cwd=repo_dir) + +# Set Git user identity for commits +subprocess.run(["git", "config", "user.email", "your-email@example.com"], cwd=repo_dir) +subprocess.run(["git", "config", "user.name", "Your Name"], cwd=repo_dir) + + +# Execute the cherry-pick operation +try: + subprocess.run(["git", "cherry-pick", merge_commit_sha], cwd=repo_dir, check=True) + print(f"Successfully cherry-picked commit {merge_commit_sha} into {new_branch_name}.") + + # Check if the commit is present in the new branch + commit_check = subprocess.run( + ["git", "rev-list", "--count", f"{merge_commit_sha}"], + cwd=repo_dir, + capture_output=True, + text=True + ) + + if commit_check.returncode == 0 and int(commit_check.stdout.strip()) > 0: + # Push the new branch + subprocess.run(["git", "push", "origin", new_branch_name], cwd=repo_dir, check=True) + print(f"Pushed new branch {new_branch_name} to origin.") + + # Create a new PR for the cherry-picked changes + new_pr = repo.create_pull( + title=f"{TARGET_BRANCH}: {pr.title}", # Prefix with branch name + body=pr.body, # Keep the original PR body + head=new_branch_name, + base=TARGET_BRANCH + ) + + print(f"Created a new PR #{new_pr.number} for cherry-picked changes.") + else: + print(f"Commit {merge_commit_sha} was not found in {new_branch_name} after cherry-picking.") + +except subprocess.CalledProcessError: + print(f"Conflict occurred while cherry-picking commit {merge_commit_sha}.") + # Add conflict label + pr.add_to_labels(CONFLICT_LABEL) + print(f"Added label '{CONFLICT_LABEL}' to PR #{pr.number} due to conflict.") \ No newline at end of file From e1fa3c911b52731debbcc363079263d4b3761b8d Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Thu, 24 Oct 2024 20:54:20 +0800 Subject: [PATCH 03/10] [Chore](Codeowners)Update CODEOWNERS (#42437) ## Proposed changes Issue Number: close #xxx --- .github/CODEOWNERS | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index b00919e4c8728d..238f80254b7d22 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -19,3 +19,4 @@ be/src/agent/be_exec_version_manager.cpp @BiteTheDDDDt fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @dataroaring @CalvinKirs @morningman **/pom.xml @CalvinKirs @morningman fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java @dataroaring @morningman @yiguolei @xiaokang +.github/workflows/* @CalvinKirs @morningman From ad699ed9aff42d2efa83e5a5e17f1906aaf16572 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Thu, 24 Oct 2024 21:28:05 +0800 Subject: [PATCH 04/10] [Fix](GA)Fix cherry-pick-script path (#42441) ## Proposed changes Issue Number: close #xxx --- tools/{tools => }/auto-pick-script.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename tools/{tools => }/auto-pick-script.py (100%) diff --git a/tools/tools/auto-pick-script.py b/tools/auto-pick-script.py similarity index 100% rename from tools/tools/auto-pick-script.py rename to tools/auto-pick-script.py From c7aa76ed801bec05bd923754b945317ba5cb8622 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Thu, 24 Oct 2024 21:53:21 +0800 Subject: [PATCH 05/10] Revert "[Chore](Codeowners)Update CODEOWNERS" (#42443) Reverts apache/doris#42437 Just test.. --- .github/CODEOWNERS | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 238f80254b7d22..b00919e4c8728d 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -19,4 +19,3 @@ be/src/agent/be_exec_version_manager.cpp @BiteTheDDDDt fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @dataroaring @CalvinKirs @morningman **/pom.xml @CalvinKirs @morningman fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java @dataroaring @morningman @yiguolei @xiaokang -.github/workflows/* @CalvinKirs @morningman From b6c1540d7797f6d02672b68b0948a100366e5374 Mon Sep 17 00:00:00 2001 From: walter Date: Thu, 24 Oct 2024 22:05:50 +0800 Subject: [PATCH 06/10] [improve](common) Add synchronized to avoid concurrent modification (#42384) --- .../java/org/apache/doris/common/MarkedCountDownLatch.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/MarkedCountDownLatch.java b/fe/fe-core/src/main/java/org/apache/doris/common/MarkedCountDownLatch.java index 6c109d7167b459..e1431c4d729c2d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/MarkedCountDownLatch.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/MarkedCountDownLatch.java @@ -35,7 +35,7 @@ public MarkedCountDownLatch(int count) { marks = HashMultimap.create(); } - public void addMark(K key, V value) { + public synchronized void addMark(K key, V value) { marks.put(key, value); } @@ -65,7 +65,7 @@ public synchronized List> getLeftMarks() { return Lists.newArrayList(marks.entries()); } - public Status getStatus() { + public synchronized Status getStatus() { return st; } From 987a197e54186555a218ce6f83993078908d2fe2 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Thu, 24 Oct 2024 22:27:24 +0800 Subject: [PATCH 07/10] [Fix](GA)Add necessary permissions (#42446) ## Proposed changes Issue Number: close #xxx --- .github/workflows/auto-cherry-pick.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/auto-cherry-pick.yml b/.github/workflows/auto-cherry-pick.yml index 1d59b52b6c9c56..d1e8d3da76e149 100644 --- a/.github/workflows/auto-cherry-pick.yml +++ b/.github/workflows/auto-cherry-pick.yml @@ -23,7 +23,7 @@ on: - closed branches: - master - +permissions: write-all jobs: auto_cherry_pick: runs-on: ubuntu-latest From ef48470ff3e2fb0420c9f9434f57a70a8a8a8524 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Thu, 24 Oct 2024 22:42:36 +0800 Subject: [PATCH 08/10] [Fix](GA)Add necessary permissions (#42447) ## Proposed changes Issue Number: close #xxx --- .github/workflows/auto-cherry-pick.yml | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/.github/workflows/auto-cherry-pick.yml b/.github/workflows/auto-cherry-pick.yml index d1e8d3da76e149..08ff3c7a0bb4aa 100644 --- a/.github/workflows/auto-cherry-pick.yml +++ b/.github/workflows/auto-cherry-pick.yml @@ -23,7 +23,11 @@ on: - closed branches: - master -permissions: write-all +permissions: + checks: write + contents: write + pull-requests: write + repository-projects: write jobs: auto_cherry_pick: runs-on: ubuntu-latest From 5f10b211b2b4bd1a73d2d388bbf7958dac0f1589 Mon Sep 17 00:00:00 2001 From: Socrates Date: Thu, 24 Oct 2024 22:49:57 +0800 Subject: [PATCH 09/10] [enhance](hive) support reading hive table with OpenCSVSerde (#42257) ## Proposed changes OpenCSVSerde Properties: | **Property** | **Description** | **Default Value** | **Supported in Doris** | |---------------------------------------|---------------------------------------------------------------------------------------------------|-------------------|--------------------------| | `separatorChar` | Defines the character used to separate fields (columns) in a CSV file. | `,` | Yes | | `quoteChar` | Defines the character used to quote fields that contain special characters, like the separator. | `"` | Yes | | `escapeChar` | Specifies the escape character used for escaping special characters, including quotes and delimiters. | `"` | Yes | ### Explanation: - **`separatorChar`**: This property defines the character that separates columns in the CSV file. Typically, a comma (`,`) is used as the default separator. - **`quoteChar`**: This character is used to enclose fields that contain special characters (like the separator). For example, if a field contains a comma, it is enclosed in quotes (`"`). - **`escapeChar`**: Specifies the character used to escape special characters, such as quotes or the separator. In many cases, a backslash (`\\`) is used as the escape character. --- .../serde_prop/some_serde_table.hql | 64 +++++++++++++++++++ .../doris/datasource/hive/HiveProperties.java | 50 ++++++++------- .../datasource/hive/source/HiveScanNode.java | 44 +++++++++---- .../hive/test_hive_serde_prop.out | 23 +++++++ .../hive/test_hive_serde_prop.groovy | 4 ++ 5 files changed, 148 insertions(+), 37 deletions(-) diff --git a/docker/thirdparties/docker-compose/hive/scripts/data/regression/serde_prop/some_serde_table.hql b/docker/thirdparties/docker-compose/hive/scripts/data/regression/serde_prop/some_serde_table.hql index 4de85bc19f0b2f..0368547f8be224 100644 --- a/docker/thirdparties/docker-compose/hive/scripts/data/regression/serde_prop/some_serde_table.hql +++ b/docker/thirdparties/docker-compose/hive/scripts/data/regression/serde_prop/some_serde_table.hql @@ -106,3 +106,67 @@ insert into serde_test4 values(1, "abc"),(2, "def"); insert into serde_test5 values(1, "abc"),(2, "def"); insert into serde_test6 values(1, "abc"),(2, "def"); insert into serde_test7 values(1, null),(2, "|||"),(3, "aaa"),(4, "\"null\""); + +CREATE TABLE test_open_csv_default_prop ( + id INT, + name STRING, + age INT, + salary DOUBLE, + is_active BOOLEAN, + hire_date DATE, + last_login TIMESTAMP, + rating FLOAT, + description STRING +) +ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' +STORED AS TEXTFILE; + +CREATE TABLE test_open_csv_standard_prop ( + id INT, + name STRING, + age INT, + salary DOUBLE, + is_active BOOLEAN, + hire_date DATE, + last_login TIMESTAMP, + rating FLOAT, + description STRING +) +ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' +WITH SERDEPROPERTIES ( + "separatorChar" = ",", + "quoteChar" = "\"", + "escapeChar" = "\\" +) +STORED AS TEXTFILE; + +CREATE TABLE test_open_csv_custom_prop ( + id INT, + name STRING, + age INT, + salary DOUBLE, + is_active BOOLEAN, + hire_date DATE, + last_login TIMESTAMP, + rating FLOAT, + description STRING +) +ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' +WITH SERDEPROPERTIES ( + "separatorChar" = "\t", + "quoteChar" = "\'", + "escapeChar" = "|" +) +STORED AS TEXTFILE; + +INSERT INTO TABLE test_open_csv_default_prop VALUES +(1, 'John Doe', 28, 50000.75, true, '2022-01-15', '2023-10-21 14:30:00', 4.5, 'Senior Developer'), +(2, 'Jane,Smith', NULL, NULL, false, '2020-05-20', NULL, NULL, '\"Project Manager\"'); + +INSERT INTO TABLE test_open_csv_standard_prop VALUES +(1, 'John Doe', 28, 50000.75, true, '2022-01-15', '2023-10-21 14:30:00', 4.5, 'Senior Developer'), +(2, 'Jane,Smith', NULL, NULL, false, '2020-05-20', NULL, NULL, '\"Project Manager\"'); + +INSERT INTO TABLE test_open_csv_custom_prop VALUES +(1, 'John Doe', 28, 50000.75, true, '2022-01-15', '2023-10-21 14:30:00', 4.5, 'Senior Developer'), +(2, 'Jane,Smith', NULL, NULL, false, '2020-05-20', NULL, NULL, '\"Project Manager\"'); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveProperties.java index 5ded87e0d235a2..74f3dcc1a9d0e2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveProperties.java @@ -19,6 +19,7 @@ import com.google.common.collect.ImmutableSet; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.serde2.OpenCSVSerde; import java.util.HashMap; import java.util.Map; @@ -27,15 +28,12 @@ public class HiveProperties { public static final String PROP_FIELD_DELIMITER = "field.delim"; - public static final String PROP_SEPARATOR_CHAR = "separatorChar"; public static final String PROP_SERIALIZATION_FORMAT = "serialization.format"; public static final String DEFAULT_FIELD_DELIMITER = "\1"; // "\x01" public static final String PROP_LINE_DELIMITER = "line.delim"; public static final String DEFAULT_LINE_DELIMITER = "\n"; - public static final String PROP_QUOTE_CHAR = "quoteChar"; - public static final String PROP_COLLECTION_DELIMITER_HIVE2 = "colelction.delim"; public static final String PROP_COLLECTION_DELIMITER_HIVE3 = "collection.delim"; public static final String DEFAULT_COLLECTION_DELIMITER = "\2"; @@ -49,6 +47,14 @@ public class HiveProperties { public static final String PROP_NULL_FORMAT = "serialization.null.format"; public static final String DEFAULT_NULL_FORMAT = "\\N"; + // The following properties are used for OpenCsvSerde. + public static final String PROP_SEPARATOR_CHAR = OpenCSVSerde.SEPARATORCHAR; + public static final String DEFAULT_SEPARATOR_CHAR = ","; + public static final String PROP_QUOTE_CHAR = OpenCSVSerde.QUOTECHAR; + public static final String DEFAULT_QUOTE_CHAR = "\""; + public static final String PROP_ESCAPE_CHAR = OpenCSVSerde.ESCAPECHAR; + public static final String DEFAULT_ESCAPE_CHAR = "\\"; + public static final Set HIVE_SERDE_PROPERTIES = ImmutableSet.of( PROP_FIELD_DELIMITER, PROP_COLLECTION_DELIMITER_HIVE2, @@ -59,37 +65,33 @@ public class HiveProperties { PROP_QUOTE_CHAR, PROP_MAP_KV_DELIMITER, PROP_ESCAPE_DELIMITER, - PROP_NULL_FORMAT - ); + PROP_ESCAPE_CHAR, + PROP_NULL_FORMAT); public static String getFieldDelimiter(Table table) { // This method is used for text format. - // If you need compatibility with csv format, please use `getColumnSeparator`. Optional fieldDelim = HiveMetaStoreClientHelper.getSerdeProperty(table, PROP_FIELD_DELIMITER); Optional serFormat = HiveMetaStoreClientHelper.getSerdeProperty(table, PROP_SERIALIZATION_FORMAT); return HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault( DEFAULT_FIELD_DELIMITER, fieldDelim, serFormat)); } - public static String getColumnSeparator(Table table) { - Optional fieldDelim = HiveMetaStoreClientHelper.getSerdeProperty(table, PROP_FIELD_DELIMITER); - Optional columnSeparator = HiveMetaStoreClientHelper.getSerdeProperty(table, PROP_SEPARATOR_CHAR); - Optional serFormat = HiveMetaStoreClientHelper.getSerdeProperty(table, PROP_SERIALIZATION_FORMAT); - return HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault( - DEFAULT_FIELD_DELIMITER, fieldDelim, columnSeparator, serFormat)); + public static String getSeparatorChar(Table table) { + Optional separatorChar = HiveMetaStoreClientHelper.getSerdeProperty(table, PROP_SEPARATOR_CHAR); + return HiveMetaStoreClientHelper.firstPresentOrDefault( + DEFAULT_SEPARATOR_CHAR, separatorChar); } - public static String getLineDelimiter(Table table) { Optional lineDelim = HiveMetaStoreClientHelper.getSerdeProperty(table, PROP_LINE_DELIMITER); return HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault( - DEFAULT_LINE_DELIMITER, lineDelim)); + DEFAULT_LINE_DELIMITER, lineDelim)); } public static String getMapKvDelimiter(Table table) { Optional mapkvDelim = HiveMetaStoreClientHelper.getSerdeProperty(table, PROP_MAP_KV_DELIMITER); return HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault( - DEFAULT_MAP_KV_DELIMITER, mapkvDelim)); + DEFAULT_MAP_KV_DELIMITER, mapkvDelim)); } public static String getCollectionDelimiter(Table table) { @@ -101,14 +103,6 @@ public static String getCollectionDelimiter(Table table) { DEFAULT_COLLECTION_DELIMITER, collectionDelimHive2, collectionDelimHive3)); } - public static Optional getQuoteChar(Table table) { - Map serdeParams = table.getSd().getSerdeInfo().getParameters(); - if (serdeParams.containsKey(PROP_QUOTE_CHAR)) { - return Optional.of(serdeParams.get(PROP_QUOTE_CHAR)); - } - return Optional.empty(); - } - public static Optional getEscapeDelimiter(Table table) { Optional escapeDelim = HiveMetaStoreClientHelper.getSerdeProperty(table, PROP_ESCAPE_DELIMITER); if (escapeDelim.isPresent()) { @@ -127,6 +121,16 @@ public static String getNullFormat(Table table) { return HiveMetaStoreClientHelper.firstPresentOrDefault(DEFAULT_NULL_FORMAT, nullFormat); } + public static String getQuoteChar(Table table) { + Optional quoteChar = HiveMetaStoreClientHelper.getSerdeProperty(table, PROP_QUOTE_CHAR); + return HiveMetaStoreClientHelper.firstPresentOrDefault(DEFAULT_QUOTE_CHAR, quoteChar); + } + + public static String getEscapeChar(Table table) { + Optional escapeChar = HiveMetaStoreClientHelper.getSerdeProperty(table, PROP_ESCAPE_CHAR); + return HiveMetaStoreClientHelper.firstPresentOrDefault(DEFAULT_ESCAPE_CHAR, escapeChar); + } + // Set properties to table public static void setTableProperties(Table table, Map properties) { HashMap serdeProps = new HashMap<>(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java index f17de4bfe0a116..dbf1ea9cd9a4e4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java @@ -385,20 +385,36 @@ protected Map getLocationProperties() throws UserException { protected TFileAttributes getFileAttributes() throws UserException { TFileTextScanRangeParams textParams = new TFileTextScanRangeParams(); Table table = hmsTable.getRemoteTable(); - // 1. set column separator - textParams.setColumnSeparator(HiveProperties.getColumnSeparator(table)); - // 2. set line delimiter - textParams.setLineDelimiter(HiveProperties.getLineDelimiter(table)); - // 3. set mapkv delimiter - textParams.setMapkvDelimiter(HiveProperties.getMapKvDelimiter(table)); - // 4. set collection delimiter - textParams.setCollectionDelimiter(HiveProperties.getCollectionDelimiter(table)); - // 5. set quote char - HiveProperties.getQuoteChar(table).ifPresent(d -> textParams.setEnclose(d.getBytes()[0])); - // 6. set escape delimiter - HiveProperties.getEscapeDelimiter(table).ifPresent(d -> textParams.setEscape(d.getBytes()[0])); - // 7. set null format - textParams.setNullFormat(HiveProperties.getNullFormat(table)); + // TODO: separate hive text table and OpenCsv table + String serDeLib = table.getSd().getSerdeInfo().getSerializationLib(); + if (serDeLib.equals("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) { + // set properties of LazySimpleSerDe + // 1. set column separator + textParams.setColumnSeparator(HiveProperties.getFieldDelimiter(table)); + // 2. set line delimiter + textParams.setLineDelimiter(HiveProperties.getLineDelimiter(table)); + // 3. set mapkv delimiter + textParams.setMapkvDelimiter(HiveProperties.getMapKvDelimiter(table)); + // 4. set collection delimiter + textParams.setCollectionDelimiter(HiveProperties.getCollectionDelimiter(table)); + // 5. set escape delimiter + HiveProperties.getEscapeDelimiter(table).ifPresent(d -> textParams.setEscape(d.getBytes()[0])); + // 6. set null format + textParams.setNullFormat(HiveProperties.getNullFormat(table)); + } else if (serDeLib.equals("org.apache.hadoop.hive.serde2.OpenCSVSerde")) { + // set set properties of OpenCSVSerde + // 1. set column separator + textParams.setColumnSeparator(HiveProperties.getSeparatorChar(table)); + // 2. set line delimiter + textParams.setLineDelimiter(HiveProperties.getLineDelimiter(table)); + // 3. set enclose char + textParams.setEnclose(HiveProperties.getQuoteChar(table).getBytes()[0]); + // 4. set escape char + textParams.setEscape(HiveProperties.getEscapeChar(table).getBytes()[0]); + } else { + throw new UserException( + "unsupported hive table serde: " + serDeLib); + } TFileAttributes fileAttributes = new TFileAttributes(); fileAttributes.setTextParams(textParams); diff --git a/regression-test/data/external_table_p0/hive/test_hive_serde_prop.out b/regression-test/data/external_table_p0/hive/test_hive_serde_prop.out index a527c7b687d7ad..3d45e525ecf69c 100644 --- a/regression-test/data/external_table_p0/hive/test_hive_serde_prop.out +++ b/regression-test/data/external_table_p0/hive/test_hive_serde_prop.out @@ -39,6 +39,18 @@ b 2.2 3 aaa 4 "null" +-- !test_open_csv_default_prop -- +1 John Doe 28 50000.75 TRUE 2022-01-15 2023-10-21 14:30:00 4.5 Senior Developer +2 Jane,Smith FALSE 2020-05-20 ""Project Manager"" + +-- !test_open_csv_standard_prop -- +1 John Doe 28 50000.75 TRUE 2022-01-15 2023-10-21 14:30:00 4.5 Senior Developer +2 Jane,Smith FALSE 2020-05-20 "Project Manager" + +-- !test_open_csv_custom_prop -- +1 John Doe 28 50000.75 TRUE 2022-01-15 2023-10-21 14:30:00 4.5 Senior Developer +2 Jane,Smith FALSE 2020-05-20 "Project Manager" + -- !1 -- a 1.1 b 2.2 @@ -79,3 +91,14 @@ b 2.2 3 aaa 4 "null" +-- !test_open_csv_default_prop -- +1 John Doe 28 50000.75 TRUE 2022-01-15 2023-10-21 14:30:00 4.5 Senior Developer +2 Jane,Smith FALSE 2020-05-20 ""Project Manager"" + +-- !test_open_csv_standard_prop -- +1 John Doe 28 50000.75 TRUE 2022-01-15 2023-10-21 14:30:00 4.5 Senior Developer +2 Jane,Smith FALSE 2020-05-20 "Project Manager" + +-- !test_open_csv_custom_prop -- +1 John Doe 28 50000.75 TRUE 2022-01-15 2023-10-21 14:30:00 4.5 Senior Developer +2 Jane,Smith FALSE 2020-05-20 "Project Manager" diff --git a/regression-test/suites/external_table_p0/hive/test_hive_serde_prop.groovy b/regression-test/suites/external_table_p0/hive/test_hive_serde_prop.groovy index d0c191f7c67b45..52cdd25eb07b2c 100644 --- a/regression-test/suites/external_table_p0/hive/test_hive_serde_prop.groovy +++ b/regression-test/suites/external_table_p0/hive/test_hive_serde_prop.groovy @@ -51,6 +51,10 @@ suite("test_hive_serde_prop", "external_docker,hive,external_docker_hive,p0,exte hive_docker """truncate table regression.serde_test8;""" sql """insert into ${catalog_name}.regression.serde_test8 select * from ${catalog_name}.regression.serde_test7;""" qt_9 """select * from ${catalog_name}.regression.serde_test8 order by id;""" + + qt_test_open_csv_default_prop """select * from ${catalog_name}.regression.test_open_csv_default_prop order by id;""" + qt_test_open_csv_standard_prop """select * from ${catalog_name}.regression.test_open_csv_standard_prop order by id;""" + qt_test_open_csv_custom_prop """select * from ${catalog_name}.regression.test_open_csv_custom_prop order by id;""" } } From bf737b12530301cd3e2dd6d50918cc3051e76a7c Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 25 Oct 2024 10:20:23 +0800 Subject: [PATCH 10/10] [Improvement](local shuffle) Improve local shuffle strategy (#41789) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add local shuffle to unpartitioned fragment to add parallel for perfomance ```sql SELECT h1.UserID, h2.URL, COUNT(*) AS visit_count FROM ( SELECT * FROM hits_10m LIMIT 5000 ) AS h1 CROSS JOIN ( SELECT * FROM hits_10m LIMIT 5000 ) AS h2 GROUP BY h1.UserID, h2.URL ORDER BY visit_count DESC LIMIT 1000 ``` Add a rule to apply local exchanger: ``` ┌───────────────────────┐ ┌───────────────────────┐ │ │ │ │ │Exchange(UNPARTITIONED)│ │Exchange(UNPARTITIONED)│ │ │ │ │ └───────────────────────┴──────┬────────┴───────────────────────┘ │ │ │ │ │ │ ┌──────▼──────┐ │ │ │ CROSS JOIN │ │ │ └──────┬──────┘ │ │ │ ┌──────────────────▼─────────────────────┐ │ │ │ LOCAL EXCHANGE (HASH PARTITION) 1 -> n │ │ │ └──────────────────┬─────────────────────┘ │ │ │ │ ▼ ┌──▼────┐ │ │ │ AGG │ │ │ └───────┘ ``` before: 1 min 17.79 sec after: 16.73 sec --- be/src/pipeline/dependency.h | 8 +- .../exec/aggregation_sink_operator.cpp | 5 +- .../pipeline/exec/aggregation_sink_operator.h | 7 +- .../exec/aggregation_source_operator.cpp | 4 +- .../pipeline/exec/analytic_sink_operator.cpp | 4 +- .../exec/analytic_source_operator.cpp | 1 + .../exec/assert_num_rows_operator.cpp | 1 + ...istinct_streaming_aggregation_operator.cpp | 4 +- .../distinct_streaming_aggregation_operator.h | 4 + be/src/pipeline/exec/exchange_sink_operator.h | 1 + .../exec/join_build_sink_operator.cpp | 2 + be/src/pipeline/exec/join_probe_operator.cpp | 1 + .../exec/nested_loop_join_probe_operator.h | 4 +- be/src/pipeline/exec/operator.cpp | 9 +- be/src/pipeline/exec/operator.h | 6 +- ...artitioned_aggregation_source_operator.cpp | 4 + .../partitioned_aggregation_source_operator.h | 2 + be/src/pipeline/exec/sort_sink_operator.cpp | 4 +- be/src/pipeline/exec/sort_sink_operator.h | 3 +- be/src/pipeline/exec/sort_source_operator.cpp | 4 +- be/src/pipeline/exec/union_source_operator.h | 4 +- .../local_exchange_sink_operator.cpp | 6 +- .../local_exchange_sink_operator.h | 2 +- be/src/pipeline/pipeline.cpp | 43 ++++++- be/src/pipeline/pipeline.h | 44 +++---- be/src/pipeline/pipeline_fragment_context.cpp | 120 +++++++++++------- be/src/pipeline/pipeline_fragment_context.h | 6 +- .../apache/doris/planner/AggregationNode.java | 5 + .../doris/planner/AnalyticEvalNode.java | 5 + .../doris/planner/AssertNumRowsNode.java | 5 + .../apache/doris/planner/DataPartition.java | 4 + .../apache/doris/planner/EmptySetNode.java | 4 + .../apache/doris/planner/ExchangeNode.java | 10 ++ .../apache/doris/planner/JoinNodeBase.java | 1 - .../doris/planner/NestedLoopJoinNode.java | 17 +++ .../apache/doris/planner/PlanFragment.java | 35 +++++ .../org/apache/doris/planner/PlanNode.java | 10 ++ .../org/apache/doris/planner/RepeatNode.java | 5 + .../org/apache/doris/planner/ScanNode.java | 5 + .../org/apache/doris/planner/SelectNode.java | 5 + .../org/apache/doris/planner/SortNode.java | 5 + .../org/apache/doris/planner/UnionNode.java | 5 + .../java/org/apache/doris/qe/Coordinator.java | 22 ++++ gensrc/thrift/PlanNodes.thrift | 1 + gensrc/thrift/Planner.thrift | 4 + .../insert_into_table/complex_insert.groovy | 6 +- .../distribute/local_shuffle.groovy | 12 +- 47 files changed, 361 insertions(+), 108 deletions(-) diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index 0885dbf380f32f..8060ee8362dede 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -111,19 +111,19 @@ class Dependency : public std::enable_shared_from_this { // Notify downstream pipeline tasks this dependency is ready. void set_ready(); void set_ready_to_read() { - DCHECK(_shared_state->source_deps.size() == 1) << debug_string(); + DCHECK_EQ(_shared_state->source_deps.size(), 1) << debug_string(); _shared_state->source_deps.front()->set_ready(); } void set_block_to_read() { - DCHECK(_shared_state->source_deps.size() == 1) << debug_string(); + DCHECK_EQ(_shared_state->source_deps.size(), 1) << debug_string(); _shared_state->source_deps.front()->block(); } void set_ready_to_write() { - DCHECK(_shared_state->sink_deps.size() == 1) << debug_string(); + DCHECK_EQ(_shared_state->sink_deps.size(), 1) << debug_string(); _shared_state->sink_deps.front()->set_ready(); } void set_block_to_write() { - DCHECK(_shared_state->sink_deps.size() == 1) << debug_string(); + DCHECK_EQ(_shared_state->sink_deps.size(), 1) << debug_string(); _shared_state->sink_deps.front()->block(); } diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 4007f50f58aff9..5fb14c025850b4 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -717,7 +717,10 @@ AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPla : tnode.agg_node.grouping_exprs), _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate), _require_bucket_distribution(require_bucket_distribution), - _agg_fn_output_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {} + _agg_fn_output_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples), + _without_key(tnode.agg_node.grouping_exprs.empty()) { + _is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator; +} Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state)); diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index 1f846ec88ff4e8..8271f1451b4320 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -143,9 +143,8 @@ class AggSinkOperatorX final : public DataSinkOperatorX { DataDistribution required_data_distribution() const override { if (_probe_expr_ctxs.empty()) { - return _needs_finalize || DataSinkOperatorX::_child - ->ignore_data_distribution() - ? DataDistribution(ExchangeType::PASSTHROUGH) + return _needs_finalize + ? DataDistribution(ExchangeType::NOOP) : DataSinkOperatorX::required_data_distribution(); } return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator @@ -204,8 +203,8 @@ class AggSinkOperatorX final : public DataSinkOperatorX { const std::vector _partition_exprs; const bool _is_colocate; const bool _require_bucket_distribution; - RowDescriptor _agg_fn_output_row_descriptor; + const bool _without_key; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp b/be/src/pipeline/exec/aggregation_source_operator.cpp index c68601fcdca39c..6d4cd291079cb6 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/aggregation_source_operator.cpp @@ -441,7 +441,9 @@ AggSourceOperatorX::AggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : Base(pool, tnode, operator_id, descs), _needs_finalize(tnode.agg_node.need_finalize), - _without_key(tnode.agg_node.grouping_exprs.empty()) {} + _without_key(tnode.agg_node.grouping_exprs.empty()) { + _is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator; +} Status AggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) { auto& local_state = get_local_state(state); diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp index 1da5c1f7c35445..afe9aeab8fdb84 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.cpp +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -201,7 +201,9 @@ AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* pool, int operator_id, _require_bucket_distribution(require_bucket_distribution), _partition_exprs(tnode.__isset.distribute_expr_lists && require_bucket_distribution ? tnode.distribute_expr_lists[0] - : tnode.analytic_node.partition_exprs) {} + : tnode.analytic_node.partition_exprs) { + _is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator; +} Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state)); diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp b/be/src/pipeline/exec/analytic_source_operator.cpp index 134a0ad82d7a05..019f95042c2e4a 100644 --- a/be/src/pipeline/exec/analytic_source_operator.cpp +++ b/be/src/pipeline/exec/analytic_source_operator.cpp @@ -475,6 +475,7 @@ AnalyticSourceOperatorX::AnalyticSourceOperatorX(ObjectPool* pool, const TPlanNo _has_range_window(tnode.analytic_node.window.type == TAnalyticWindowType::RANGE), _has_window_start(tnode.analytic_node.window.__isset.window_start), _has_window_end(tnode.analytic_node.window.__isset.window_end) { + _is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator; _fn_scope = AnalyticFnScope::PARTITION; if (tnode.analytic_node.__isset.window && tnode.analytic_node.window.type == TAnalyticWindowType::RANGE) { diff --git a/be/src/pipeline/exec/assert_num_rows_operator.cpp b/be/src/pipeline/exec/assert_num_rows_operator.cpp index c1a02b6f838376..345e42b7d96837 100644 --- a/be/src/pipeline/exec/assert_num_rows_operator.cpp +++ b/be/src/pipeline/exec/assert_num_rows_operator.cpp @@ -27,6 +27,7 @@ AssertNumRowsOperatorX::AssertNumRowsOperatorX(ObjectPool* pool, const TPlanNode : StreamingOperatorX(pool, tnode, operator_id, descs), _desired_num_rows(tnode.assert_num_rows_node.desired_num_rows), _subquery_string(tnode.assert_num_rows_node.subquery_string) { + _is_serial_operator = true; if (tnode.assert_num_rows_node.__isset.assertion) { _assertion = tnode.assert_num_rows_node.assertion; } else { diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp index ddec533a9ff157..a59af8ce7b474a 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp @@ -326,7 +326,9 @@ DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, i ? tnode.distribute_expr_lists[0] : tnode.agg_node.grouping_exprs), _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate), - _require_bucket_distribution(require_bucket_distribution) { + _require_bucket_distribution(require_bucket_distribution), + _without_key(tnode.agg_node.grouping_exprs.empty()) { + _is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator; if (tnode.agg_node.__isset.use_streaming_preaggregation) { _is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation; if (_is_streaming_preagg) { diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h index 71d289402ec6d8..1f7a21190ad769 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h @@ -104,6 +104,9 @@ class DistinctStreamingAggOperatorX final bool need_more_input_data(RuntimeState* state) const override; DataDistribution required_data_distribution() const override { + if (_needs_finalize && _probe_expr_ctxs.empty()) { + return {ExchangeType::NOOP}; + } if (_needs_finalize || (!_probe_expr_ctxs.empty() && !_is_streaming_preagg)) { return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) @@ -136,6 +139,7 @@ class DistinctStreamingAggOperatorX final /// The total size of the row from the aggregate functions. size_t _total_size_of_aggregate_states = 0; bool _is_streaming_preagg = false; + const bool _without_key; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 8af944728a2d33..689172dfc6b9a6 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -224,6 +224,7 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX::JoinBuildSinkOperatorX(ObjectPool* pool, _short_circuit_for_null_in_build_side(_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !_is_mark_join), _runtime_filter_descs(tnode.runtime_filters) { + DataSinkOperatorX::_is_serial_operator = + tnode.__isset.is_serial_operator && tnode.is_serial_operator; _init_join_op(); if (_is_mark_join) { DCHECK(_join_op == TJoinOp::LEFT_ANTI_JOIN || _join_op == TJoinOp::LEFT_SEMI_JOIN || diff --git a/be/src/pipeline/exec/join_probe_operator.cpp b/be/src/pipeline/exec/join_probe_operator.cpp index 8e5010d7513ad1..76dc75a90d8f3c 100644 --- a/be/src/pipeline/exec/join_probe_operator.cpp +++ b/be/src/pipeline/exec/join_probe_operator.cpp @@ -220,6 +220,7 @@ JoinProbeOperatorX::JoinProbeOperatorX(ObjectPool* pool, const T : true) ) { + Base::_is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator; if (tnode.__isset.hash_join_node) { _intermediate_row_desc.reset(new RowDescriptor( descs, tnode.hash_join_node.vintermediate_tuple_id_list, diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.h b/be/src/pipeline/exec/nested_loop_join_probe_operator.h index 4121de64210dd5..5b0fec159e28bf 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.h @@ -203,7 +203,9 @@ class NestedLoopJoinProbeOperatorX final } DataDistribution required_data_distribution() const override { - if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { + if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || + _join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::RIGHT_ANTI_JOIN || + _join_op == TJoinOp::RIGHT_SEMI_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN) { return {ExchangeType::NOOP}; } return {ExchangeType::ADAPTIVE_PASSTHROUGH}; diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp index 5a13fdcbd8482f..6e3099db7486bc 100644 --- a/be/src/pipeline/exec/operator.cpp +++ b/be/src/pipeline/exec/operator.cpp @@ -141,8 +141,9 @@ std::string PipelineXSinkLocalState::debug_string(int indentatio std::string OperatorXBase::debug_string(int indentation_level) const { fmt::memory_buffer debug_string_buffer; - fmt::format_to(debug_string_buffer, "{}{}: id={}, parallel_tasks={}", - std::string(indentation_level * 2, ' '), _op_name, node_id(), _parallel_tasks); + fmt::format_to(debug_string_buffer, "{}{}: id={}, parallel_tasks={}, _is_serial_operator={}", + std::string(indentation_level * 2, ' '), _op_name, node_id(), _parallel_tasks, + _is_serial_operator); return fmt::to_string(debug_string_buffer); } @@ -363,8 +364,8 @@ void PipelineXLocalStateBase::reached_limit(vectorized::Block* block, bool* eos) std::string DataSinkOperatorXBase::debug_string(int indentation_level) const { fmt::memory_buffer debug_string_buffer; - fmt::format_to(debug_string_buffer, "{}{}: id={}", std::string(indentation_level * 2, ' '), - _name, node_id()); + fmt::format_to(debug_string_buffer, "{}{}: id={}, _is_serial_operator={}", + std::string(indentation_level * 2, ' '), _name, node_id(), _is_serial_operator); return fmt::to_string(debug_string_buffer); } diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index b5bd0fe4713085..5df0a19498f395 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -101,6 +101,9 @@ class OperatorBase { return Status::OK(); } + // Operators need to be executed serially. (e.g. finalized agg without key) + [[nodiscard]] virtual bool is_serial_operator() const { return _is_serial_operator; } + [[nodiscard]] bool is_closed() const { return _is_closed; } virtual size_t revocable_mem_size(RuntimeState* state) const { return 0; } @@ -122,6 +125,7 @@ class OperatorBase { bool _is_closed; bool _followed_by_shuffled_operator = false; + bool _is_serial_operator = false; }; class PipelineXLocalStateBase { @@ -444,7 +448,7 @@ class DataSinkOperatorXBase : public OperatorBase { Status init(const TDataSink& tsink) override; [[nodiscard]] virtual Status init(ExchangeType type, const int num_buckets, - const bool is_shuffled_hash_join, + const bool use_global_hash_shuffle, const std::map& shuffle_idx_to_instance_idx) { return Status::InternalError("init() is only implemented in local exchange!"); } diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp index 48df5587198b08..655a6e19725a9b 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp @@ -118,6 +118,10 @@ Status PartitionedAggSourceOperatorX::close(RuntimeState* state) { return _agg_source_operator->close(state); } +bool PartitionedAggSourceOperatorX::is_serial_operator() const { + return _agg_source_operator->is_serial_operator(); +} + Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) { auto& local_state = get_local_state(state); diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h index edae99c716a925..7e73241745e029 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h @@ -91,6 +91,8 @@ class PartitionedAggSourceOperatorX : public OperatorX bool is_source() const override { return true; } + bool is_serial_operator() const override; + private: friend class PartitionedAggLocalState; diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp b/be/src/pipeline/exec/sort_sink_operator.cpp index ee8689a8084e5c..6d6684437b8124 100644 --- a/be/src/pipeline/exec/sort_sink_operator.cpp +++ b/be/src/pipeline/exec/sort_sink_operator.cpp @@ -90,7 +90,9 @@ SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int operator_id, const TP : std::vector {}), _algorithm(tnode.sort_node.__isset.algorithm ? tnode.sort_node.algorithm : TSortAlgorithm::FULL_SORT), - _reuse_mem(_algorithm != TSortAlgorithm::HEAP_SORT) {} + _reuse_mem(_algorithm != TSortAlgorithm::HEAP_SORT) { + _is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator; +} Status SortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state)); diff --git a/be/src/pipeline/exec/sort_sink_operator.h b/be/src/pipeline/exec/sort_sink_operator.h index 8462472dd02671..0829c38b40f0b5 100644 --- a/be/src/pipeline/exec/sort_sink_operator.h +++ b/be/src/pipeline/exec/sort_sink_operator.h @@ -69,8 +69,9 @@ class SortSinkOperatorX final : public DataSinkOperatorX { } else if (_merge_by_exchange) { // The current sort node is used for the ORDER BY return {ExchangeType::PASSTHROUGH}; + } else { + return {ExchangeType::NOOP}; } - return DataSinkOperatorX::required_data_distribution(); } bool require_shuffled_data_distribution() const override { return _is_analytic_sort; } bool require_data_distribution() const override { return _is_colocate; } diff --git a/be/src/pipeline/exec/sort_source_operator.cpp b/be/src/pipeline/exec/sort_source_operator.cpp index 02a99e183c852e..7f801b79c0b12b 100644 --- a/be/src/pipeline/exec/sort_source_operator.cpp +++ b/be/src/pipeline/exec/sort_source_operator.cpp @@ -30,7 +30,9 @@ SortSourceOperatorX::SortSourceOperatorX(ObjectPool* pool, const TPlanNode& tnod const DescriptorTbl& descs) : OperatorX(pool, tnode, operator_id, descs), _merge_by_exchange(tnode.sort_node.merge_by_exchange), - _offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0) {} + _offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0) { + _is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator; +} Status SortSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(Base::init(tnode, state)); diff --git a/be/src/pipeline/exec/union_source_operator.h b/be/src/pipeline/exec/union_source_operator.h index 2d112ebf2df579..200e7de8597b91 100644 --- a/be/src/pipeline/exec/union_source_operator.h +++ b/be/src/pipeline/exec/union_source_operator.h @@ -63,7 +63,9 @@ class UnionSourceOperatorX final : public OperatorX { using Base = OperatorX; UnionSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, const DescriptorTbl& descs) - : Base(pool, tnode, operator_id, descs), _child_size(tnode.num_children) {}; + : Base(pool, tnode, operator_id, descs), _child_size(tnode.num_children) { + _is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator; + } ~UnionSourceOperatorX() override = default; Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp index d87113ca80a959..ff243186c47c43 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp @@ -36,17 +36,17 @@ std::vector LocalExchangeSinkLocalState::dependencies() const { } Status LocalExchangeSinkOperatorX::init(ExchangeType type, const int num_buckets, - const bool should_disable_bucket_shuffle, + const bool use_global_hash_shuffle, const std::map& shuffle_idx_to_instance_idx) { _name = "LOCAL_EXCHANGE_SINK_OPERATOR (" + get_exchange_type_name(type) + ")"; _type = type; if (_type == ExchangeType::HASH_SHUFFLE) { - _use_global_shuffle = should_disable_bucket_shuffle; + _use_global_shuffle = use_global_hash_shuffle; // For shuffle join, if data distribution has been broken by previous operator, we // should use a HASH_SHUFFLE local exchanger to shuffle data again. To be mentioned, // we should use map shuffle idx to instance idx because all instances will be // distributed to all BEs. Otherwise, we should use shuffle idx directly. - if (should_disable_bucket_shuffle) { + if (use_global_hash_shuffle) { std::for_each(shuffle_idx_to_instance_idx.begin(), shuffle_idx_to_instance_idx.end(), [&](const auto& item) { DCHECK(item.first != -1); diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h index 1cd9736d4291d6..09b1f2cc3105f2 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h @@ -100,7 +100,7 @@ class LocalExchangeSinkOperatorX final : public DataSinkOperatorX& shuffle_idx_to_instance_idx) override; Status open(RuntimeState* state) override; diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp index 6e83c7805e46fc..5b93fbdf1f8480 100644 --- a/be/src/pipeline/pipeline.cpp +++ b/be/src/pipeline/pipeline.cpp @@ -22,6 +22,7 @@ #include #include "pipeline/exec/operator.h" +#include "pipeline/pipeline_fragment_context.h" #include "pipeline/pipeline_task.h" namespace doris::pipeline { @@ -31,7 +32,47 @@ void Pipeline::_init_profile() { _pipeline_profile = std::make_unique(std::move(s)); } -Status Pipeline::add_operator(OperatorPtr& op) { +bool Pipeline::need_to_local_exchange(const DataDistribution target_data_distribution, + const int idx) const { + // If serial operator exists after `idx`-th operator, we should not improve parallelism. + if (std::any_of(_operators.begin() + idx, _operators.end(), + [&](OperatorPtr op) -> bool { return op->is_serial_operator(); })) { + return false; + } + if (std::all_of(_operators.begin(), _operators.end(), + [&](OperatorPtr op) -> bool { return op->is_serial_operator(); })) { + if (!_sink->is_serial_operator()) { + return true; + } + } else if (std::any_of(_operators.begin(), _operators.end(), + [&](OperatorPtr op) -> bool { return op->is_serial_operator(); })) { + return true; + } + + if (target_data_distribution.distribution_type != ExchangeType::BUCKET_HASH_SHUFFLE && + target_data_distribution.distribution_type != ExchangeType::HASH_SHUFFLE) { + return true; + } else if (_operators.front()->ignore_data_hash_distribution()) { + if (_data_distribution.distribution_type == target_data_distribution.distribution_type && + (_data_distribution.partition_exprs.empty() || + target_data_distribution.partition_exprs.empty())) { + return true; + } + return _data_distribution.distribution_type != target_data_distribution.distribution_type && + !(is_hash_exchange(_data_distribution.distribution_type) && + is_hash_exchange(target_data_distribution.distribution_type)); + } else { + return _data_distribution.distribution_type != target_data_distribution.distribution_type && + !(is_hash_exchange(_data_distribution.distribution_type) && + is_hash_exchange(target_data_distribution.distribution_type)); + } +} + +Status Pipeline::add_operator(OperatorPtr& op, const int parallelism) { + if (parallelism > 0 && op->is_serial_operator()) { + set_num_tasks(parallelism); + op->set_ignore_data_distribution(); + } op->set_parallel_tasks(num_tasks()); _operators.emplace_back(op); if (op->is_source()) { diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index 8a20ccb631cc47..ef0ae9e9a75aa2 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -44,14 +44,16 @@ class Pipeline : public std::enable_shared_from_this { public: explicit Pipeline(PipelineId pipeline_id, int num_tasks, - std::weak_ptr context) - : _pipeline_id(pipeline_id), _num_tasks(num_tasks) { + std::weak_ptr context, int num_tasks_of_parent) + : _pipeline_id(pipeline_id), + _num_tasks(num_tasks), + _num_tasks_of_parent(num_tasks_of_parent) { _init_profile(); _tasks.resize(_num_tasks, nullptr); } // Add operators for pipelineX - Status add_operator(OperatorPtr& op); + Status add_operator(OperatorPtr& op, const int parallelism); // prepare operators for pipelineX Status prepare(RuntimeState* state); @@ -71,28 +73,8 @@ class Pipeline : public std::enable_shared_from_this { return idx == ExchangeType::HASH_SHUFFLE || idx == ExchangeType::BUCKET_HASH_SHUFFLE; } - bool need_to_local_exchange(const DataDistribution target_data_distribution) const { - if (target_data_distribution.distribution_type != ExchangeType::BUCKET_HASH_SHUFFLE && - target_data_distribution.distribution_type != ExchangeType::HASH_SHUFFLE) { - return true; - } else if (_operators.front()->ignore_data_hash_distribution()) { - if (_data_distribution.distribution_type == - target_data_distribution.distribution_type && - (_data_distribution.partition_exprs.empty() || - target_data_distribution.partition_exprs.empty())) { - return true; - } - return _data_distribution.distribution_type != - target_data_distribution.distribution_type && - !(is_hash_exchange(_data_distribution.distribution_type) && - is_hash_exchange(target_data_distribution.distribution_type)); - } else { - return _data_distribution.distribution_type != - target_data_distribution.distribution_type && - !(is_hash_exchange(_data_distribution.distribution_type) && - is_hash_exchange(target_data_distribution.distribution_type)); - } - } + bool need_to_local_exchange(const DataDistribution target_data_distribution, + const int idx) const; void init_data_distribution() { set_data_distribution(_operators.front()->required_data_distribution()); } @@ -120,6 +102,14 @@ class Pipeline : public std::enable_shared_from_this { for (auto& op : _operators) { op->set_parallel_tasks(_num_tasks); } + +#ifndef NDEBUG + if (num_tasks > 1 && + std::any_of(_operators.begin(), _operators.end(), + [&](OperatorPtr op) -> bool { return op->is_serial_operator(); })) { + DCHECK(false) << debug_string(); + } +#endif } int num_tasks() const { return _num_tasks; } bool close_task() { return _num_tasks_running.fetch_sub(1) == 1; } @@ -136,6 +126,8 @@ class Pipeline : public std::enable_shared_from_this { return fmt::to_string(debug_string_buffer); } + int num_tasks_of_parent() const { return _num_tasks_of_parent; } + private: void _init_profile(); @@ -173,6 +165,8 @@ class Pipeline : public std::enable_shared_from_this { std::atomic _num_tasks_running = 0; // Tasks in this pipeline. std::vector _tasks; + // Parallelism of parent pipeline. + const int _num_tasks_of_parent; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 28cfefbf6c1bd0..fd3baefa76f9c7 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -214,8 +214,9 @@ void PipelineFragmentContext::cancel(const Status reason) { PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) { PipelineId id = _next_pipeline_id++; auto pipeline = std::make_shared( - id, _num_instances, - std::dynamic_pointer_cast(shared_from_this())); + id, parent ? std::min(parent->num_tasks(), _num_instances) : _num_instances, + std::dynamic_pointer_cast(shared_from_this()), + parent ? parent->num_tasks() : _num_instances); if (idx >= 0) { _pipelines.insert(_pipelines.begin() + idx, pipeline); } else { @@ -235,6 +236,8 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re if (request.__isset.query_options && request.query_options.__isset.execution_timeout) { _timeout = request.query_options.execution_timeout; } + _use_serial_source = + request.fragment.__isset.use_serial_source && request.fragment.use_serial_source; _fragment_level_profile = std::make_unique("PipelineContext"); _prepare_timer = ADD_TIMER(_fragment_level_profile, "PrepareTime"); @@ -749,13 +752,12 @@ Status PipelineFragmentContext::_add_local_exchange_impl( const bool followed_by_shuffled_operator = operators.size() > idx ? operators[idx]->followed_by_shuffled_operator() : cur_pipe->sink()->followed_by_shuffled_operator(); - const bool should_disable_bucket_shuffle = + const bool use_global_hash_shuffle = bucket_seq_to_instance_idx.empty() && shuffle_idx_to_instance_idx.find(-1) == shuffle_idx_to_instance_idx.end() && - followed_by_shuffled_operator; + followed_by_shuffled_operator && !_use_serial_source; sink.reset(new LocalExchangeSinkOperatorX( - sink_id, local_exchange_id, - should_disable_bucket_shuffle ? _total_instances : _num_instances, + sink_id, local_exchange_id, use_global_hash_shuffle ? _total_instances : _num_instances, data_distribution.partition_exprs, bucket_seq_to_instance_idx)); if (bucket_seq_to_instance_idx.empty() && data_distribution.distribution_type == ExchangeType::BUCKET_HASH_SHUFFLE) { @@ -763,8 +765,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl( } RETURN_IF_ERROR(new_pip->set_sink(sink)); RETURN_IF_ERROR(new_pip->sink()->init(data_distribution.distribution_type, num_buckets, - should_disable_bucket_shuffle, - shuffle_idx_to_instance_idx)); + use_global_hash_shuffle, shuffle_idx_to_instance_idx)); // 2. Create and initialize LocalExchangeSharedState. std::shared_ptr shared_state = @@ -775,7 +776,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl( case ExchangeType::HASH_SHUFFLE: shared_state->exchanger = ShuffleExchanger::create_unique( std::max(cur_pipe->num_tasks(), _num_instances), - should_disable_bucket_shuffle ? _total_instances : _num_instances, + use_global_hash_shuffle ? _total_instances : _num_instances, _runtime_state->query_options().__isset.local_exchange_free_blocks_limit ? _runtime_state->query_options().local_exchange_free_blocks_limit : 0); @@ -915,11 +916,11 @@ Status PipelineFragmentContext::_add_local_exchange( const std::map& bucket_seq_to_instance_idx, const std::map& shuffle_idx_to_instance_idx, const bool ignore_data_distribution) { - if (_num_instances <= 1) { + if (_num_instances <= 1 || cur_pipe->num_tasks_of_parent() <= 1) { return Status::OK(); } - if (!cur_pipe->need_to_local_exchange(data_distribution)) { + if (!cur_pipe->need_to_local_exchange(data_distribution, idx)) { return Status::OK(); } *do_local_exchange = true; @@ -1154,7 +1155,8 @@ Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS // 1. create and set the source operator of multi_cast_data_stream_source for new pipeline source_op.reset(new MultiCastDataStreamerSourceOperatorX( i, pool, thrift_sink.multi_cast_stream_sink.sinks[i], row_desc, source_id)); - RETURN_IF_ERROR(new_pipeline->add_operator(source_op)); + RETURN_IF_ERROR(new_pipeline->add_operator( + source_op, params.__isset.parallel_instances ? params.parallel_instances : 0)); // 2. create and set sink operator of data stream sender for new pipeline DataSinkOperatorPtr sink_op; @@ -1203,7 +1205,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo op.reset(new OlapScanOperatorX( pool, tnode, next_operator_id(), descs, _num_instances, enable_query_cache ? request.fragment.query_cache_param : TQueryCacheParam {})); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); if (request.__isset.parallel_instances) { cur_pipe->set_num_tasks(request.parallel_instances); op->set_ignore_data_distribution(); @@ -1216,7 +1219,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo _query_ctx->query_mem_tracker->is_group_commit_load = true; #endif op.reset(new GroupCommitOperatorX(pool, tnode, next_operator_id(), descs, _num_instances)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); if (request.__isset.parallel_instances) { cur_pipe->set_num_tasks(request.parallel_instances); op->set_ignore_data_distribution(); @@ -1226,7 +1230,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo case doris::TPlanNodeType::JDBC_SCAN_NODE: { if (config::enable_java_support) { op.reset(new JDBCScanOperatorX(pool, tnode, next_operator_id(), descs, _num_instances)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); } else { return Status::InternalError( "Jdbc scan node is disabled, you can change be config enable_java_support " @@ -1240,7 +1245,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo } case doris::TPlanNodeType::FILE_SCAN_NODE: { op.reset(new FileScanOperatorX(pool, tnode, next_operator_id(), descs, _num_instances)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); if (request.__isset.parallel_instances) { cur_pipe->set_num_tasks(request.parallel_instances); op->set_ignore_data_distribution(); @@ -1250,7 +1256,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo case TPlanNodeType::ES_SCAN_NODE: case TPlanNodeType::ES_HTTP_SCAN_NODE: { op.reset(new EsScanOperatorX(pool, tnode, next_operator_id(), descs, _num_instances)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); if (request.__isset.parallel_instances) { cur_pipe->set_num_tasks(request.parallel_instances); op->set_ignore_data_distribution(); @@ -1261,7 +1268,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo int num_senders = find_with_default(request.per_exch_num_senders, tnode.node_id, 0); DCHECK_GT(num_senders, 0); op.reset(new ExchangeSourceOperatorX(pool, tnode, next_operator_id(), descs, num_senders)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); if (request.__isset.parallel_instances) { op->set_ignore_data_distribution(); cur_pipe->set_num_tasks(request.parallel_instances); @@ -1280,7 +1288,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo auto cache_source_id = next_operator_id(); op.reset(new CacheSourceOperatorX(pool, cache_node_id, cache_source_id, request.fragment.query_cache_param)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); const auto downstream_pipeline_id = cur_pipe->id(); if (_dag.find(downstream_pipeline_id) == _dag.end()) { @@ -1315,7 +1324,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo _require_bucket_distribution)); op->set_followed_by_shuffled_operator(false); _require_bucket_distribution = true; - RETURN_IF_ERROR(new_pipe->add_operator(op)); + RETURN_IF_ERROR(new_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op)); cur_pipe = new_pipe; } else { @@ -1324,7 +1334,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo op->set_followed_by_shuffled_operator(followed_by_shuffled_operator); _require_bucket_distribution = _require_bucket_distribution || op->require_data_distribution(); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); } } else if (tnode.agg_node.__isset.use_streaming_preaggregation && tnode.agg_node.use_streaming_preaggregation && @@ -1335,11 +1346,13 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo op.reset(new StreamingAggOperatorX(pool, next_operator_id(), tnode, descs)); RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op)); - RETURN_IF_ERROR(new_pipe->add_operator(op)); + RETURN_IF_ERROR(new_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); cur_pipe = new_pipe; } else { op.reset(new StreamingAggOperatorX(pool, next_operator_id(), tnode, descs)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); } } else { // create new pipeline to add query cache operator @@ -1355,10 +1368,12 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo } if (enable_query_cache) { RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op)); - RETURN_IF_ERROR(new_pipe->add_operator(op)); + RETURN_IF_ERROR(new_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); cur_pipe = new_pipe; } else { - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); } const auto downstream_pipeline_id = cur_pipe->id(); @@ -1406,7 +1421,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo pool, tnode_, next_operator_id(), descs, partition_count); probe_operator->set_inner_operators(inner_sink_operator, inner_probe_operator); op = std::move(probe_operator); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); const auto downstream_pipeline_id = cur_pipe->id(); if (_dag.find(downstream_pipeline_id) == _dag.end()) { @@ -1430,7 +1446,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo op->set_followed_by_shuffled_operator(op->is_shuffled_operator()); } else { op.reset(new HashJoinProbeOperatorX(pool, tnode, next_operator_id(), descs)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); const auto downstream_pipeline_id = cur_pipe->id(); if (_dag.find(downstream_pipeline_id) == _dag.end()) { @@ -1457,7 +1474,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo } case TPlanNodeType::CROSS_JOIN_NODE: { op.reset(new NestedLoopJoinProbeOperatorX(pool, tnode, next_operator_id(), descs)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); const auto downstream_pipeline_id = cur_pipe->id(); if (_dag.find(downstream_pipeline_id) == _dag.end()) { @@ -1480,7 +1498,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo int child_count = tnode.num_children; op.reset(new UnionSourceOperatorX(pool, tnode, next_operator_id(), descs)); op->set_followed_by_shuffled_operator(_require_bucket_distribution); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); const auto downstream_pipeline_id = cur_pipe->id(); if (_dag.find(downstream_pipeline_id) == _dag.end()) { @@ -1508,7 +1527,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo } else { op.reset(new SortSourceOperatorX(pool, tnode, next_operator_id(), descs)); } - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); const auto downstream_pipeline_id = cur_pipe->id(); if (_dag.find(downstream_pipeline_id) == _dag.end()) { @@ -1535,7 +1555,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo } case doris::TPlanNodeType::PARTITION_SORT_NODE: { op.reset(new PartitionSortSourceOperatorX(pool, tnode, next_operator_id(), descs)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); const auto downstream_pipeline_id = cur_pipe->id(); if (_dag.find(downstream_pipeline_id) == _dag.end()) { @@ -1553,7 +1574,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo } case TPlanNodeType::ANALYTIC_EVAL_NODE: { op.reset(new AnalyticSourceOperatorX(pool, tnode, next_operator_id(), descs)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); const auto downstream_pipeline_id = cur_pipe->id(); if (_dag.find(downstream_pipeline_id) == _dag.end()) { @@ -1575,39 +1597,44 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo } case TPlanNodeType::INTERSECT_NODE: { RETURN_IF_ERROR(_build_operators_for_set_operation_node( - pool, tnode, descs, op, cur_pipe, parent_idx, child_idx)); + pool, tnode, descs, op, cur_pipe, parent_idx, child_idx, request)); op->set_followed_by_shuffled_operator(_require_bucket_distribution); break; } case TPlanNodeType::EXCEPT_NODE: { RETURN_IF_ERROR(_build_operators_for_set_operation_node( - pool, tnode, descs, op, cur_pipe, parent_idx, child_idx)); + pool, tnode, descs, op, cur_pipe, parent_idx, child_idx, request)); op->set_followed_by_shuffled_operator(_require_bucket_distribution); break; } case TPlanNodeType::REPEAT_NODE: { op.reset(new RepeatOperatorX(pool, tnode, next_operator_id(), descs)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); break; } case TPlanNodeType::TABLE_FUNCTION_NODE: { op.reset(new TableFunctionOperatorX(pool, tnode, next_operator_id(), descs)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); break; } case TPlanNodeType::ASSERT_NUM_ROWS_NODE: { op.reset(new AssertNumRowsOperatorX(pool, tnode, next_operator_id(), descs)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); break; } case TPlanNodeType::EMPTY_SET_NODE: { op.reset(new EmptySetSourceOperatorX(pool, tnode, next_operator_id(), descs)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); break; } case TPlanNodeType::DATA_GEN_SCAN_NODE: { op.reset(new DataGenSourceOperatorX(pool, tnode, next_operator_id(), descs)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); if (request.__isset.parallel_instances) { cur_pipe->set_num_tasks(request.parallel_instances); op->set_ignore_data_distribution(); @@ -1616,17 +1643,20 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo } case TPlanNodeType::SCHEMA_SCAN_NODE: { op.reset(new SchemaScanOperatorX(pool, tnode, next_operator_id(), descs)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); break; } case TPlanNodeType::META_SCAN_NODE: { op.reset(new MetaScanOperatorX(pool, tnode, next_operator_id(), descs)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); break; } case TPlanNodeType::SELECT_NODE: { op.reset(new SelectOperatorX(pool, tnode, next_operator_id(), descs)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); break; } default: @@ -1642,9 +1672,11 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo template Status PipelineFragmentContext::_build_operators_for_set_operation_node( ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, OperatorPtr& op, - PipelinePtr& cur_pipe, int parent_idx, int child_idx) { + PipelinePtr& cur_pipe, int parent_idx, int child_idx, + const doris::TPipelineFragmentParams& request) { op.reset(new SetSourceOperatorX(pool, tnode, next_operator_id(), descs)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); const auto downstream_pipeline_id = cur_pipe->id(); if (_dag.find(downstream_pipeline_id) == _dag.end()) { diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 0749729789e2f4..6caa0e5c106722 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -121,7 +121,7 @@ class PipelineFragmentContext : public TaskExecutionContext { _tasks[j][i]->stop_if_finished(); } } - }; + } private: Status _build_pipelines(ObjectPool* pool, const doris::TPipelineFragmentParams& request, @@ -140,7 +140,8 @@ class PipelineFragmentContext : public TaskExecutionContext { Status _build_operators_for_set_operation_node(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, OperatorPtr& op, PipelinePtr& cur_pipe, int parent_idx, - int child_idx); + int child_idx, + const doris::TPipelineFragmentParams& request); Status _create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink, const std::vector& output_exprs, @@ -224,6 +225,7 @@ class PipelineFragmentContext : public TaskExecutionContext { int _num_instances = 1; int _timeout = -1; + bool _use_serial_source = false; OperatorPtr _root_op = nullptr; // this is a [n * m] matrix. n is parallelism of pipeline engine and m is the number of pipelines. diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java index 4dca9384d65794..55d1b4b50c0ecd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java @@ -488,6 +488,11 @@ public void finalize(Analyzer analyzer) throws UserException { } } + @Override + public boolean isSerialOperator() { + return aggInfo.getGroupingExprs().isEmpty() && needsFinalize; + } + public void setColocate(boolean colocate) { isColocate = colocate; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java index cdbf827aed979e..dce6c3d1b04e42 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java @@ -296,4 +296,9 @@ public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { return output.toString(); } + + @Override + public boolean isSerialOperator() { + return partitionExprs.isEmpty(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java index 57d9ce8742fb3a..a4c4aa42c6579c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java @@ -116,4 +116,9 @@ protected void toThrift(TPlanNode msg) { public int getNumInstances() { return 1; } + + @Override + public boolean isSerialOperator() { + return true; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java index 9c6ba83408ac00..ce57a57c37780a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java @@ -90,6 +90,10 @@ public boolean isBucketShuffleHashPartition() { return type == TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED; } + public boolean isTabletSinkShufflePartition() { + return type == TPartitionType.TABLET_SINK_SHUFFLE_PARTITIONED; + } + public TPartitionType getType() { return type; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java index 867c220d9feecf..f6ddf23429eddb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java @@ -81,4 +81,8 @@ public int getNumInstances() { return 1; } + @Override + public boolean isSerialOperator() { + return true; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java index 4ada9a82f7c752..7af092871915e5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java @@ -195,6 +195,11 @@ public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { return prefix + "offset: " + offset + "\n"; } + @Override + public boolean isMerging() { + return mergeInfo != null; + } + public boolean isRightChildOfBroadcastHashJoin() { return isRightChildOfBroadcastHashJoin; } @@ -202,4 +207,9 @@ public boolean isRightChildOfBroadcastHashJoin() { public void setRightChildOfBroadcastHashJoin(boolean value) { isRightChildOfBroadcastHashJoin = value; } + + @Override + public boolean isSerialOperator() { + return true; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java b/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java index 91a3c26e7708fa..5dc81e29d85913 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java @@ -597,7 +597,6 @@ public void setUseSpecificProjections(boolean useSpecificProjections) { this.useSpecificProjections = useSpecificProjections; } - public boolean isUseSpecificProjections() { return useSpecificProjections; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java index 30c0a2d0394fa7..c7b3525e4cdb6c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java @@ -281,4 +281,21 @@ public String getNodeExplainString(String detailPrefix, TExplainLevel detailLeve } return output.toString(); } + + /** + * If joinOp is one of type below: + * 1. NULL_AWARE_LEFT_ANTI_JOIN + * 2. RIGHT_OUTER_JOIN + * 3. RIGHT_ANTI_JOIN + * 4. RIGHT_SEMI_JOIN + * + * We will + * @return + */ + @Override + public boolean isSerialOperator() { + return joinOp == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN || joinOp == JoinOperator.RIGHT_OUTER_JOIN + || joinOp == JoinOperator.RIGHT_ANTI_JOIN || joinOp == JoinOperator.RIGHT_SEMI_JOIN + || joinOp == JoinOperator.FULL_OUTER_JOIN; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java index ae1d34308a38ed..3e3c49bf67524a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java @@ -341,6 +341,7 @@ public TPlanFragment toThrift() { // TODO chenhao , calculated by cost result.setMinReservationBytes(0); result.setInitialReservationTotalClaims(0); + result.setUseSerialSource(useSerialSource(ConnectContext.get())); return result; } @@ -502,4 +503,38 @@ public void setBucketNum(int bucketNum) { public boolean hasNullAwareLeftAntiJoin() { return planRoot.isNullAwareLeftAntiJoin(); } + + private boolean isMergingFragment() { + return planRoot.isMerging(); + } + + public boolean useSerialSource(ConnectContext context) { + return context != null + && context.getSessionVariable().isIgnoreStorageDataDistribution() + && !hasNullAwareLeftAntiJoin() + // If input data partition is UNPARTITIONED and sink is DataStreamSink and root node is not a serial + // operator, we use local exchange to improve parallelism + && getDataPartition() == DataPartition.UNPARTITIONED && !children.isEmpty() + && sink instanceof DataStreamSink && !planRoot.isSerialOperator() + /** + * If table `t1` has unique key `k1` and value column `v1`. + * Now use plan below to load data into `t1`: + * ``` + * FRAGMENT 0: + * Merging Exchange (id = 1) + * NL Join (id = 2) + * DataStreamSender (id = 3, dst_id = 3) (TABLET_SINK_SHUFFLE_PARTITIONED) + * + * FRAGMENT 1: + * Exchange (id = 3) + * OlapTableSink (id = 4) ``` + * + * In this plan, `Exchange (id = 1)` needs to do merge sort using column `k1` and `v1` so parallelism + * of FRAGMENT 0 must be 1 and data will be shuffled to FRAGMENT 1 which also has only 1 instance + * because this loading job relies on the global ordering of column `k1` and `v1`. + * + * So FRAGMENT 0 should not use serial source. + */ + && !isMergingFragment(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index 1e9d5646939896..d1ba493682b4ee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -279,6 +279,10 @@ public boolean isNullAwareLeftAntiJoin() { return children.stream().anyMatch(PlanNode::isNullAwareLeftAntiJoin); } + public boolean isMerging() { + return children.stream().anyMatch(PlanNode::isMerging); + } + public PlanFragment getFragment() { return fragment; } @@ -639,6 +643,7 @@ private void treeToThriftHelper(TPlan container) { TPlanNode msg = new TPlanNode(); msg.node_id = id.asInt(); msg.setNereidsId(nereidsId); + msg.setIsSerialOperator(isSerialOperator()); msg.num_children = children.size(); msg.limit = limit; for (TupleId tid : tupleIds) { @@ -1374,4 +1379,9 @@ public void foreachDownInCurrentFragment(Consumer visitor) { return true; }); } + + // Operators need to be executed serially. (e.g. finalized agg without key) + public boolean isSerialOperator() { + return false; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java index 3c6a88cea08315..407d8a6444c219 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java @@ -200,4 +200,9 @@ public String getNodeExplainString(String detailPrefix, TExplainLevel detailLeve } return output.toString(); } + + @Override + public boolean isSerialOperator() { + return children.get(0).isSerialOperator(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index a92cac7b510260..1681699d651ce8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -848,4 +848,9 @@ public long getSelectedPartitionNum() { public long getSelectedSplitNum() { return selectedSplitNum; } + + @Override + public boolean isSerialOperator() { + return true; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SelectNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SelectNode.java index 6c6b665b00a51a..b3b088837a69f8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SelectNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SelectNode.java @@ -109,4 +109,9 @@ public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { } return output.toString(); } + + @Override + public boolean isSerialOperator() { + return children.get(0).isSerialOperator(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java index e3c405bcbabf27..fc1c50c0bba2ef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java @@ -389,6 +389,11 @@ public Set computeInputSlotIds(Analyzer analyzer) throws NotImplementedE return new HashSet<>(result); } + @Override + public boolean isSerialOperator() { + return !isAnalyticSort && !mergeByexchange; + } + public void setColocate(boolean colocate) { isColocate = colocate; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java index 40982d07e771ed..bf48a770f1c879 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java @@ -42,4 +42,9 @@ public UnionNode(PlanNodeId id, TupleId tupleId, protected void toThrift(TPlanNode msg) { toThrift(msg, TPlanNodeType.UNION_NODE); } + + @Override + public boolean isSerialOperator() { + return children.isEmpty(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 8e580c549df05e..4eda6775b5c51d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1779,6 +1779,20 @@ protected void computeFragmentHosts() throws Exception { FInstanceExecParam instanceParam = new FInstanceExecParam(null, execHostport, 0, params); params.instanceExecParams.add(instanceParam); + + // Using serial source means a serial source operator will be used in this fragment (e.g. data will be + // shuffled to only 1 exchange operator) and then splitted by followed local exchanger + int expectedInstanceNum = fragment.getParallelExecNum(); + boolean useSerialSource = fragment.useSerialSource(context) && useNereids + && fragment.queryCacheParam == null; + if (useSerialSource) { + for (int j = 1; j < expectedInstanceNum; j++) { + params.instanceExecParams.add(new FInstanceExecParam( + null, execHostport, 0, params)); + } + params.ignoreDataDistribution = true; + params.parallelTasksNum = 1; + } continue; } @@ -1808,6 +1822,10 @@ protected void computeFragmentHosts() throws Exception { if (leftMostNode.getNumInstances() == 1) { exchangeInstances = 1; } + // Using serial source means a serial source operator will be used in this fragment (e.g. data will be + // shuffled to only 1 exchange operator) and then splitted by followed local exchanger + boolean useSerialSource = fragment.useSerialSource(context) && useNereids + && fragment.queryCacheParam == null; if (exchangeInstances > 0 && fragmentExecParamsMap.get(inputFragmentId) .instanceExecParams.size() > exchangeInstances) { // random select some instance @@ -1825,12 +1843,16 @@ protected void computeFragmentHosts() throws Exception { hosts.get(index % hosts.size()), 0, params); params.instanceExecParams.add(instanceParam); } + params.ignoreDataDistribution = useSerialSource; + params.parallelTasksNum = useSerialSource ? 1 : params.instanceExecParams.size(); } else { for (FInstanceExecParam execParams : fragmentExecParamsMap.get(inputFragmentId).instanceExecParams) { FInstanceExecParam instanceParam = new FInstanceExecParam(null, execParams.host, 0, params); params.instanceExecParams.add(instanceParam); } + params.ignoreDataDistribution = useSerialSource; + params.parallelTasksNum = useSerialSource ? 1 : params.instanceExecParams.size(); } // When group by cardinality is smaller than number of backend, only some backends always diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 5c0273da791be8..eb5266942c023a 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -1366,6 +1366,7 @@ struct TPlanNode { 49: optional i64 push_down_count 50: optional list> distribute_expr_lists + 51: optional bool is_serial_operator // projections is final projections, which means projecting into results and materializing them into the output block. 101: optional list projections 102: optional Types.TTupleId output_tuple_id diff --git a/gensrc/thrift/Planner.thrift b/gensrc/thrift/Planner.thrift index 866d8d45320243..ffcc33638db52c 100644 --- a/gensrc/thrift/Planner.thrift +++ b/gensrc/thrift/Planner.thrift @@ -64,6 +64,10 @@ struct TPlanFragment { 8: optional i64 initial_reservation_total_claims 9: optional QueryCache.TQueryCacheParam query_cache_param + + // Using serial source means a serial source operator will be used in this fragment (e.g. data will be shuffled to + // only 1 exchange operator) and then splitted by followed local exchanger + 10: optional bool use_serial_source } // location information for a single scan range diff --git a/regression-test/suites/nereids_p0/insert_into_table/complex_insert.groovy b/regression-test/suites/nereids_p0/insert_into_table/complex_insert.groovy index 2493a7df5de6d3..049cbe0b4d7273 100644 --- a/regression-test/suites/nereids_p0/insert_into_table/complex_insert.groovy +++ b/regression-test/suites/nereids_p0/insert_into_table/complex_insert.groovy @@ -177,15 +177,15 @@ suite('complex_insert') { sql 'insert into t1(id, c1, c2, c3) select id, c1 * 2, c2, c3 from t1' sql 'sync' - qt_sql_1 'select * from t1, t2, t3 order by t1.id, t1.id1, t2.id, t3.id' + qt_sql_1 'select * from t1, t2, t3 order by t1.id, t1.id1, t2.id, t2.c1, t3.id' sql 'insert into t2(id, c1, c2, c3) select id, c1, c2 * 2, c3 from t2' sql 'sync' - qt_sql_2 'select * from t1, t2, t3 order by t1.id, t1.id1, t2.id, t3.id' + qt_sql_2 'select * from t1, t2, t3 order by t1.id, t1.id1, t2.id, t2.c1, t3.id' sql 'insert into t2(c1, c3) select c1 + 1, c3 + 1 from (select id, c1, c3 from t1 order by id, c1 limit 10) t1, t3' sql 'sync' - qt_sql_3 'select * from t1, t2, t3 order by t1.id, t1.id1, t2.id, t3.id' + qt_sql_3 'select * from t1, t2, t3 order by t1.id, t1.id1, t2.id, t2.c1, t3.id' sql 'drop table if exists agg_have_dup_base' diff --git a/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy b/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy index 997230b1a067b0..950b6171c7ca84 100644 --- a/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy +++ b/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy @@ -52,7 +52,7 @@ suite("local_shuffle") { set force_to_local_shuffle=true; """ - order_qt_read_single_olap_table "select * from test_local_shuffle1" + order_qt_read_single_olap_table "select * from test_local_shuffle1 order by id, id2" order_qt_broadcast_join """ select * @@ -96,7 +96,7 @@ suite("local_shuffle") { ) a right outer join [shuffle] test_local_shuffle2 - on a.id=test_local_shuffle2.id2 + on a.id=test_local_shuffle2.id2 order by test_local_shuffle2.id, test_local_shuffle2.id2 """ order_qt_bucket_shuffle_with_prune_tablets2 """ @@ -109,7 +109,7 @@ suite("local_shuffle") { from test_local_shuffle1 where id=1 ) a - on a.id=test_local_shuffle2.id2 + on a.id=test_local_shuffle2.id2 order by test_local_shuffle2.id, test_local_shuffle2.id2 """ order_qt_bucket_shuffle_with_prune_tablets3 """ @@ -150,11 +150,11 @@ suite("local_shuffle") { """ order_qt_fillup_bucket """ - SELECT cast(a.c0 as int), cast(b.c0 as int) FROM + SELECT cast(a.c0 as int), cast(b.c0 as int) res FROM (select * from test_local_shuffle3 where c0 =1)a RIGHT OUTER JOIN (select * from test_local_shuffle4)b - ON a.c0 = b.c0 + ON a.c0 = b.c0 order by res """ multi_sql """ @@ -182,6 +182,6 @@ suite("local_shuffle") { ) a inner join [shuffle] test_shuffle_left_with_local_shuffle b - on a.id2=b.id; + on a.id2=b.id order by a.id2; """ }