diff --git a/be/src/exec/pipeline/table_function_operator.cpp b/be/src/exec/pipeline/table_function_operator.cpp index 3a66aa3ce4222..44e6522be9869 100644 --- a/be/src/exec/pipeline/table_function_operator.cpp +++ b/be/src/exec/pipeline/table_function_operator.cpp @@ -95,8 +95,13 @@ Status TableFunctionOperator::prepare(RuntimeState* state) { if (_table_function == nullptr) { return Status::InternalError("can't find table function " + table_function_name); } + if (_tnode.table_function_node.__isset.fn_result_required) { + _fn_result_required = _tnode.table_function_node.fn_result_required; + } else { + _fn_result_required = true; + } RETURN_IF_ERROR(_table_function->init(table_fn, &_table_function_state)); - + _table_function_state->set_is_required(_fn_result_required); _table_function_exec_timer = ADD_TIMER(_unique_metrics, "TableFunctionExecTime"); _table_function_exec_counter = ADD_COUNTER(_unique_metrics, "TableFunctionExecCount", TUnit::UNIT); RETURN_IF_ERROR(_table_function->prepare(_table_function_state)); @@ -159,8 +164,11 @@ ChunkPtr TableFunctionOperator::_build_chunk(const std::vector& colum for (size_t i = 0; i < _outer_slots.size(); ++i) { chunk->append_column(columns[i], _outer_slots[i]); } - for (size_t i = 0; i < _fn_result_slots.size(); ++i) { - chunk->append_column(columns[_outer_slots.size() + i], _fn_result_slots[i]); + + if (_fn_result_required) { + for (size_t i = 0; i < _fn_result_slots.size(); ++i) { + chunk->append_column(columns[_outer_slots.size() + i], _fn_result_slots[i]); + } } return chunk; @@ -222,8 +230,10 @@ void TableFunctionOperator::_copy_result(const std::vector& columns, } // Build table function result - for (size_t i = 0; i < _fn_result_slots.size(); ++i) { - columns[_outer_slots.size() + i]->append(*(fn_result_cols[i]), start, copy_rows); + if (_fn_result_required) { + for (size_t i = 0; i < _fn_result_slots.size(); ++i) { + columns[_outer_slots.size() + i]->append(*(fn_result_cols[i]), start, copy_rows); + } } } diff --git a/be/src/exec/pipeline/table_function_operator.h b/be/src/exec/pipeline/table_function_operator.h index 52d267c9c82e6..7905c04eaaaa9 100644 --- a/be/src/exec/pipeline/table_function_operator.h +++ b/be/src/exec/pipeline/table_function_operator.h @@ -73,6 +73,7 @@ class TableFunctionOperator final : public Operator { size_t _next_output_row_offset = 0; // table function result std::pair _table_function_result; + bool _fn_result_required = true; // table function param and return offset TableFunctionState* _table_function_state = nullptr; diff --git a/be/src/exprs/table_function/table_function.h b/be/src/exprs/table_function/table_function.h index 9ae6252806b7e..fd081708033a6 100644 --- a/be/src/exprs/table_function/table_function.h +++ b/be/src/exprs/table_function/table_function.h @@ -59,6 +59,10 @@ class TableFunctionState { [[nodiscard]] const Status& status() const { return _status; } + void set_is_required(bool is_required) { _is_required = is_required; } + + bool is_required() { return _is_required; } + private: virtual void on_new_params(){}; @@ -79,6 +83,7 @@ class TableFunctionState { // used to identify left join for table function bool _is_left_join = false; + bool _is_required = true; }; class TableFunction { diff --git a/be/src/exprs/table_function/unnest.h b/be/src/exprs/table_function/unnest.h index d170a16c30069..43b3491e028bd 100644 --- a/be/src/exprs/table_function/unnest.h +++ b/be/src/exprs/table_function/unnest.h @@ -42,15 +42,15 @@ class Unnest final : public TableFunction { auto offset_column = col_array->offsets_column(); auto copy_count_column = UInt32Column::create(); copy_count_column->append(0); - ColumnPtr unnested_array_elements = col_array->elements_column()->clone_empty(); - uint32_t offset = 0; for (int row_idx = 0; row_idx < arg0->size(); ++row_idx) { if (arg0->is_null(row_idx)) { if (state->get_is_left_join()) { // to support unnest with null. - unnested_array_elements->append_nulls(1); + if (state->is_required()) { + unnested_array_elements->append_nulls(1); + } offset += 1; } copy_count_column->append(offset); @@ -58,13 +58,17 @@ class Unnest final : public TableFunction { if (offset_column->get(row_idx + 1).get_int32() == offset_column->get(row_idx).get_int32() && state->get_is_left_join()) { // to support unnest with null. - unnested_array_elements->append_nulls(1); + if (state->is_required()) { + unnested_array_elements->append_nulls(1); + } offset += 1; } else { auto length = offset_column->get(row_idx + 1).get_int32() - offset_column->get(row_idx).get_int32(); - unnested_array_elements->append(*(col_array->elements_column()), - offset_column->get(row_idx).get_int32(), length); + if (state->is_required()) { + unnested_array_elements->append(*(col_array->elements_column()), + offset_column->get(row_idx).get_int32(), length); + } offset += length; } copy_count_column->append(offset); diff --git a/fe/fe-core/src/main/java/com/starrocks/planner/TableFunctionNode.java b/fe/fe-core/src/main/java/com/starrocks/planner/TableFunctionNode.java index 94395ffb7433e..98550aecf22e8 100644 --- a/fe/fe-core/src/main/java/com/starrocks/planner/TableFunctionNode.java +++ b/fe/fe-core/src/main/java/com/starrocks/planner/TableFunctionNode.java @@ -28,21 +28,25 @@ import com.starrocks.thrift.TTypeDesc; import java.util.List; +import java.util.stream.Collectors; public class TableFunctionNode extends PlanNode { private final TableFunction tableFunction; + //Slots of output by table function private final List fnResultSlots; + //External column slots of the join logic generated by the table function private final List outerSlots; //Slots of table function input parameters private final List paramSlots; - + private final boolean fnResultRequired; public TableFunctionNode(PlanNodeId id, PlanNode child, TupleDescriptor outputTupleDesc, TableFunction tableFunction, List paramSlots, List outerSlots, - List fnResultSlots) { + List fnResultSlots, + boolean fnResultRequired) { super(id, "TableValueFunction"); this.children.add(child); this.tableFunction = tableFunction; @@ -50,6 +54,7 @@ public TableFunctionNode(PlanNodeId id, PlanNode child, TupleDescriptor outputTu this.paramSlots = paramSlots; this.outerSlots = outerSlots; this.fnResultSlots = fnResultSlots; + this.fnResultRequired = fnResultRequired; this.tupleIds.add(outputTupleDesc.getId()); } @@ -69,6 +74,7 @@ protected void toThrift(TPlanNode msg) { msg.table_function_node.setParam_columns(paramSlots); msg.table_function_node.setOuter_columns(outerSlots); msg.table_function_node.setFn_result_columns(fnResultSlots); + msg.table_function_node.setFn_result_required(fnResultRequired); } @Override @@ -77,6 +83,7 @@ protected String getNodeExplainString(String prefix, TExplainLevel detailLevel) output.append(prefix).append("tableFunctionName: ").append(tableFunction.getFunctionName()).append('\n'); output.append(prefix).append("columns: ").append(tableFunction.getDefaultColumnNames()).append('\n'); output.append(prefix).append("returnTypes: ").append(tableFunction.getTableFnReturnTypes()).append('\n'); + return output.toString(); } @@ -101,4 +108,8 @@ protected void toNormalForm(TNormalPlanNode planNode, FragmentNormalizer normali public boolean needCollectExecStats() { return true; } + + public boolean isFnResultRequired() { + return fnResultRequired; + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/plan/PlanFragmentBuilder.java b/fe/fe-core/src/main/java/com/starrocks/sql/plan/PlanFragmentBuilder.java index 1ac67ab3dc730..25f66894795d9 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/plan/PlanFragmentBuilder.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/plan/PlanFragmentBuilder.java @@ -3350,6 +3350,12 @@ public PlanFragment visitPhysicalTableFunction(OptExpression optExpression, Exec } udtfOutputTuple.computeMemLayout(); + ColumnRefSet fnResultsRequired = ColumnRefSet.of(); + optExpression.getRowOutputInfo().getColumnRefMap().values() + .forEach(expr -> fnResultsRequired.union(expr.getUsedColumns())); + Optional.ofNullable(physicalTableFunction.getPredicate()) + .ifPresent(pred -> fnResultsRequired.union(pred.getUsedColumns())); + fnResultsRequired.intersect(physicalTableFunction.getFnResultColRefs()); TableFunctionNode tableFunctionNode = new TableFunctionNode(context.getNextNodeId(), inputFragment.getPlanRoot(), udtfOutputTuple, @@ -3359,8 +3365,10 @@ public PlanFragment visitPhysicalTableFunction(OptExpression optExpression, Exec physicalTableFunction.getOuterColRefs().stream().map(ColumnRefOperator::getId) .collect(Collectors.toList()), physicalTableFunction.getFnResultColRefs().stream().map(ColumnRefOperator::getId) - .collect(Collectors.toList()) + .collect(Collectors.toList()), + !fnResultsRequired.isEmpty() ); + tableFunctionNode.computeStatistics(optExpression.getStatistics()); tableFunctionNode.setLimit(physicalTableFunction.getLimit()); currentExecGroup.add(tableFunctionNode); diff --git a/fe/fe-core/src/test/java/com/starrocks/sql/plan/TableFunctionTest.java b/fe/fe-core/src/test/java/com/starrocks/sql/plan/TableFunctionTest.java index 63aef555095a0..08d04fc51ec2e 100644 --- a/fe/fe-core/src/test/java/com/starrocks/sql/plan/TableFunctionTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/sql/plan/TableFunctionTest.java @@ -14,10 +14,12 @@ package com.starrocks.sql.plan; +import com.starrocks.planner.TableFunctionNode; import com.starrocks.sql.analyzer.SemanticException; import org.junit.Assert; import org.junit.Test; +import java.util.Optional; public class TableFunctionTest extends PlanTestBase { @Test @@ -290,4 +292,34 @@ public void testUnnesetBitmapToArrayToUnnestBitmapRewrite() throws Exception { PlanTestBase.assertContains(plan, "tableFunctionName: unnest_bitmap"); PlanTestBase.assertNotContains(plan, "bitmap_to_array"); } + + @Test + public void testUnnesetFnResultNotRequired() throws Exception { + Object[][] testCaseList = new Object[][] { + { + "select t.* from test_all_type t, unnest(split(t1a, ','))", + false + }, + { + "select t.*, unnest from test_all_type t, unnest(split(t1a, ','))", + true + } + }; + + for (Object[] tc : testCaseList) { + String sql = (String) tc[0]; + Boolean isRequired = (Boolean) tc[1]; + System.out.println(sql); + ExecPlan plan = getExecPlan(sql); + + Optional optTableFuncNode = plan.getFragments() + .stream() + .flatMap(fragment -> fragment.collectNodes().stream()) + .filter(planNode -> planNode instanceof TableFunctionNode) + .map(planNode -> (TableFunctionNode) planNode) + .findFirst(); + Assert.assertTrue(optTableFuncNode.isPresent()); + Assert.assertEquals(optTableFuncNode.get().isFnResultRequired(), isRequired); + } + } } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 499d19d602d58..bf3a645f85b2d 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -1213,6 +1213,7 @@ struct TTableFunctionNode { 2: optional list param_columns 3: optional list outer_columns 4: optional list fn_result_columns + 5: optional bool fn_result_required } struct TConnectorScanNode {