Skip to content

Commit

Permalink
Use implicit_ordering for asof_join rather than require_sequenced_output
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi committed Nov 1, 2024
1 parent 567f9c5 commit 0b710a1
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 18 deletions.
4 changes: 2 additions & 2 deletions cpp/src/arrow/acero/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ class ARROW_ACERO_EXPORT SourceNodeOptions : public ExecNodeOptions {
std::shared_ptr<Schema> output_schema;
/// \brief an asynchronous stream of batches ending with std::nullopt
std::function<Future<std::optional<ExecBatch>>()> generator;

Ordering ordering = Ordering::Unordered();
/// \brief the order of the data, defaults to Ordering::Unordered
Ordering ordering;
};

/// \brief a node that generates data from a table already loaded in memory
Expand Down
10 changes: 5 additions & 5 deletions cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,7 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan* plan,
auto scan_options = scan_node_options.scan_options;
auto dataset = scan_node_options.dataset;
bool require_sequenced_output = scan_node_options.require_sequenced_output;
bool implicit_ordering = scan_node_options.implicit_ordering;

RETURN_NOT_OK(NormalizeScanOptions(scan_options, dataset->schema()));

Expand Down Expand Up @@ -1032,11 +1033,11 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan* plan,
} else {
batch_gen = std::move(merged_batch_gen);
}
int64_t index = require_sequenced_output ? 0 : compute::kUnsequencedIndex;

auto gen = MakeMappedGenerator(
std::move(batch_gen),
[scan_options, index](const EnumeratedRecordBatch& partial) mutable
-> Result<std::optional<compute::ExecBatch>> {
[scan_options](const EnumeratedRecordBatch& partial)
-> Result<std::optional<compute::ExecBatch>> {
// TODO(ARROW-13263) fragments may be able to attach more guarantees to batches
// than this, for example parquet's row group stats. Failing to do this leaves
// perf on the table because row group stats could be used to skip kernel execs in
Expand All @@ -1057,11 +1058,10 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan* plan,
batch->values.emplace_back(partial.record_batch.index);
batch->values.emplace_back(partial.record_batch.last);
batch->values.emplace_back(partial.fragment.value->ToString());
if (index != compute::kUnsequencedIndex) batch->index = index++;
return batch;
});

auto ordering = require_sequenced_output ? Ordering::Implicit() : Ordering::Unordered();
auto ordering = implicit_ordering ? Ordering::Implicit() : Ordering::Unordered();

auto fields = scan_options->dataset_schema->fields();
if (scan_options->add_augmented_fields) {
Expand Down
7 changes: 5 additions & 2 deletions cpp/src/arrow/dataset/scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -563,14 +563,17 @@ class ARROW_DS_EXPORT ScanNodeOptions : public acero::ExecNodeOptions {
public:
explicit ScanNodeOptions(std::shared_ptr<Dataset> dataset,
std::shared_ptr<ScanOptions> scan_options,
bool require_sequenced_output = false)
bool require_sequenced_output = false,
bool implicit_ordering = false)
: dataset(std::move(dataset)),
scan_options(std::move(scan_options)),
require_sequenced_output(require_sequenced_output) {}
require_sequenced_output(require_sequenced_output),
implicit_ordering(implicit_ordering) {}

std::shared_ptr<Dataset> dataset;
std::shared_ptr<ScanOptions> scan_options;
bool require_sequenced_output;
bool implicit_ordering;
};

/// @}
Expand Down
12 changes: 8 additions & 4 deletions python/pyarrow/_dataset.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -4068,13 +4068,15 @@ cdef class _ScanNodeOptions(ExecNodeOptions):
cdef:
shared_ptr[CScanOptions] c_scan_options
bint require_sequenced_output=False
bint implicit_ordering=False

c_scan_options = Scanner._make_scan_options(dataset, scan_options)

require_sequenced_output=scan_options.get("require_sequenced_output", False)
implicit_ordering=scan_options.get("implicit_ordering", False)

self.wrapped.reset(
new CScanNodeOptions(dataset.unwrap(), c_scan_options, require_sequenced_output)
new CScanNodeOptions(dataset.unwrap(), c_scan_options, require_sequenced_output, implicit_ordering)
)


Expand All @@ -4092,8 +4094,8 @@ class ScanNodeOptions(_ScanNodeOptions):
expression or projection to the scan node that you also supply
to the filter or project node.
Yielded batches will be augmented with fragment/batch indices to
enable stable ordering for simple ExecPlans.
Yielded batches will be augmented with fragment/batch indices when
implicit_ordering=True to enable stable ordering for simple ExecPlans.
Parameters
----------
Expand All @@ -4102,7 +4104,9 @@ class ScanNodeOptions(_ScanNodeOptions):
**kwargs : dict, optional
Scan options. See `Scanner.from_dataset` for possible arguments.
require_sequenced_output : bool, default False
Assert implicit ordering on data.
Batches are yielded sequentially, like single-threaded
implicit_ordering : bool, default False
Preserve implicit ordering of data.
"""

def __init__(self, Dataset dataset, **kwargs):
Expand Down
8 changes: 4 additions & 4 deletions python/pyarrow/acero.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ class InMemoryDataset:
ds = DatasetModuleStub


def _dataset_to_decl(dataset, use_threads=True, require_sequenced_output=False):
def _dataset_to_decl(dataset, use_threads=True, implicit_ordering=False):
decl = Declaration("scan", ScanNodeOptions(
dataset, use_threads=use_threads,
require_sequenced_output=require_sequenced_output))
implicit_ordering=implicit_ordering))

# Get rid of special dataset columns
# "__fragment_index", "__batch_index", "__last_in_fragment", "__filename"
Expand Down Expand Up @@ -316,15 +316,15 @@ def _perform_join_asof(left_operand, left_on, left_by,
left_source = _dataset_to_decl(
left_operand,
use_threads=use_threads,
require_sequenced_output=True)
implicit_ordering=True)
else:
left_source = Declaration(
"table_source", TableSourceNodeOptions(left_operand),
)
if isinstance(right_operand, ds.Dataset):
right_source = _dataset_to_decl(
right_operand, use_threads=use_threads,
require_sequenced_output=True)
implicit_ordering=True)
else:
right_source = Declaration(
"table_source", TableSourceNodeOptions(right_operand)
Expand Down
2 changes: 1 addition & 1 deletion python/pyarrow/includes/libarrow_dataset.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
CExpression filter

cdef cppclass CScanNodeOptions "arrow::dataset::ScanNodeOptions"(CExecNodeOptions):
CScanNodeOptions(shared_ptr[CDataset] dataset, shared_ptr[CScanOptions] scan_options, bint require_sequenced_output)
CScanNodeOptions(shared_ptr[CDataset] dataset, shared_ptr[CScanOptions] scan_options, bint require_sequenced_output, bint implicit_ordering)

shared_ptr[CScanOptions] scan_options

Expand Down

0 comments on commit 0b710a1

Please sign in to comment.