Skip to content

Commit

Permalink
Merge branch 'main' into fix_outer_join
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya authored Jun 17, 2024
2 parents 764fbc4 + e1cfb48 commit ce9ba8a
Show file tree
Hide file tree
Showing 140 changed files with 4,132 additions and 4,141 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ homepage = "https://datafusion.apache.org"
license = "Apache-2.0"
readme = "README.md"
repository = "https://github.com/apache/datafusion"
rust-version = "1.73"
rust-version = "1.75"
version = "39.0.0"

[workspace.dependencies]
Expand Down Expand Up @@ -107,7 +107,7 @@ doc-comment = "0.3"
env_logger = "0.11"
futures = "0.3"
half = { version = "2.2.1", default-features = false }
hashbrown = { version = "0.14", features = ["raw"] }
hashbrown = { version = "0.14.5", features = ["raw"] }
indexmap = "2.0.0"
itertools = "0.12"
log = "^0.4"
Expand Down
3 changes: 2 additions & 1 deletion benchmarks/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
data
results
results
venv
24 changes: 18 additions & 6 deletions benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ DATA_DIR=${DATA_DIR:-$SCRIPT_DIR/data}
#CARGO_COMMAND=${CARGO_COMMAND:-"cargo run --release"}
CARGO_COMMAND=${CARGO_COMMAND:-"cargo run --profile release-nonlto"} # for faster iterations
PREFER_HASH_JOIN=${PREFER_HASH_JOIN:-true}
VIRTUAL_ENV=${VIRTUAL_ENV:-$SCRIPT_DIR/venv}

usage() {
echo "
Expand All @@ -46,6 +47,7 @@ Usage:
$0 data [benchmark]
$0 run [benchmark]
$0 compare <branch1> <branch2>
$0 venv
**********
Examples:
Expand All @@ -62,6 +64,7 @@ DATAFUSION_DIR=/source/datafusion ./bench.sh run tpch
data: Generates or downloads data needed for benchmarking
run: Runs the named benchmark
compare: Compares results from benchmark runs
venv: Creates new venv (unless already exists) and installs compare's requirements into it
**********
* Benchmarks
Expand All @@ -84,7 +87,8 @@ DATA_DIR directory to store datasets
CARGO_COMMAND command that runs the benchmark binary
DATAFUSION_DIR directory to use (default $DATAFUSION_DIR)
RESULTS_NAME folder where the benchmark files are stored
PREFER_HASH_JOIN Prefer hash join algorithm(default true)
PREFER_HASH_JOIN Prefer hash join algorithm (default true)
VENV_PATH Python venv to use for compare and venv commands (default ./venv, override by <your-venv>/bin/activate)
"
exit 1
}
Expand Down Expand Up @@ -243,6 +247,9 @@ main() {
compare)
compare_benchmarks "$ARG2" "$ARG3"
;;
venv)
setup_venv
;;
"")
usage
;;
Expand Down Expand Up @@ -302,7 +309,7 @@ data_tpch() {
else
echo " creating parquet files using benchmark binary ..."
pushd "${SCRIPT_DIR}" > /dev/null
$CARGO_COMMAND --bin tpch -- convert --input "${TPCH_DIR}" --prefer_hash_join ${PREFER_HASH_JOIN} --output "${TPCH_DIR}" --format parquet
$CARGO_COMMAND --bin tpch -- convert --input "${TPCH_DIR}" --output "${TPCH_DIR}" --format parquet
popd > /dev/null
fi
}
Expand Down Expand Up @@ -405,23 +412,23 @@ run_clickbench_1() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_1.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running clickbench (1 file) benchmark..."
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --prefer_hash_join ${PREFER_HASH_JOIN} --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
}

# Runs the clickbench benchmark with the partitioned parquet files
run_clickbench_partitioned() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_partitioned.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running clickbench (partitioned, 100 files) benchmark..."
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits_partitioned" --prefer_hash_join ${PREFER_HASH_JOIN} --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
}

# Runs the clickbench "extended" benchmark with a single large parquet file
run_clickbench_extended() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_extended.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running clickbench (1 file) extended benchmark..."
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --prefer_hash_join ${PREFER_HASH_JOIN} --queries-path "${SCRIPT_DIR}/queries/clickbench/extended.sql" -o ${RESULTS_FILE}
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/extended.sql" -o ${RESULTS_FILE}
}

compare_benchmarks() {
Expand All @@ -448,13 +455,18 @@ compare_benchmarks() {
echo "--------------------"
echo "Benchmark ${bench}"
echo "--------------------"
python3 "${SCRIPT_DIR}"/compare.py "${RESULTS_FILE1}" "${RESULTS_FILE2}"
PATH=$VIRTUAL_ENV/bin:$PATH python3 "${SCRIPT_DIR}"/compare.py "${RESULTS_FILE1}" "${RESULTS_FILE2}"
else
echo "Note: Skipping ${RESULTS_FILE1} as ${RESULTS_FILE2} does not exist"
fi
done

}

setup_venv() {
python3 -m venv $VIRTUAL_ENV
PATH=$VIRTUAL_ENV/bin:$PATH python3 -m pip install -r requirements.txt
}

# And start the process up
main
2 changes: 1 addition & 1 deletion benchmarks/compare.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from rich.console import Console
from rich.table import Table
except ImportError:
print("Try `pip install rich` for using this script.")
print("Couldn't import modules -- run `./bench.sh venv` first")
raise


Expand Down
18 changes: 18 additions & 0 deletions benchmarks/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

rich
2 changes: 2 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ license = "Apache-2.0"
homepage = "https://datafusion.apache.org"
repository = "https://github.com/apache/datafusion"
# Specify MSRV here as `cargo msrv` doesn't support workspace version
rust-version = "1.73"
rust-version = "1.75"
readme = "README.md"

[dependencies]
Expand Down
97 changes: 97 additions & 0 deletions datafusion-cli/examples/cli-session-context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Shows an example of a custom session context that unions the input plan with itself.
//! To run this example, use `cargo run --example cli-session-context` from within the `datafusion-cli` directory.
use std::sync::Arc;

use datafusion::{
dataframe::DataFrame,
error::DataFusionError,
execution::{context::SessionState, TaskContext},
logical_expr::{LogicalPlan, LogicalPlanBuilder},
prelude::SessionContext,
};
use datafusion_cli::{
cli_context::CliSessionContext, exec::exec_from_repl, print_options::PrintOptions,
};
use object_store::ObjectStore;

/// This is a toy example of a custom session context that unions the input plan with itself.
struct MyUnionerContext {
ctx: SessionContext,
}

impl Default for MyUnionerContext {
fn default() -> Self {
Self {
ctx: SessionContext::new(),
}
}
}

#[async_trait::async_trait]
impl CliSessionContext for MyUnionerContext {
fn task_ctx(&self) -> Arc<TaskContext> {
self.ctx.task_ctx()
}

fn session_state(&self) -> SessionState {
self.ctx.state()
}

fn register_object_store(
&self,
url: &url::Url,
object_store: Arc<dyn ObjectStore>,
) -> Option<Arc<dyn ObjectStore + 'static>> {
self.ctx.register_object_store(url, object_store)
}

fn register_table_options_extension_from_scheme(&self, _scheme: &str) {
unimplemented!()
}

async fn execute_logical_plan(
&self,
plan: LogicalPlan,
) -> Result<DataFrame, DataFusionError> {
let new_plan = LogicalPlanBuilder::from(plan.clone())
.union(plan.clone())?
.build()?;

self.ctx.execute_logical_plan(new_plan).await
}
}

#[tokio::main]
/// Runs the example.
pub async fn main() {
let mut my_ctx = MyUnionerContext::default();

let mut print_options = PrintOptions {
format: datafusion_cli::print_format::PrintFormat::Automatic,
quiet: false,
maxrows: datafusion_cli::print_options::MaxRows::Unlimited,
color: true,
};

exec_from_repl(&mut my_ctx, &mut print_options)
.await
.unwrap();
}
98 changes: 98 additions & 0 deletions datafusion-cli/src/cli_context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use datafusion::{
dataframe::DataFrame,
error::DataFusionError,
execution::{context::SessionState, TaskContext},
logical_expr::LogicalPlan,
prelude::SessionContext,
};
use object_store::ObjectStore;

use crate::object_storage::{AwsOptions, GcpOptions};

#[async_trait::async_trait]
/// The CLI session context trait provides a way to have a session context that can be used with datafusion's CLI code.
pub trait CliSessionContext {
/// Get an atomic reference counted task context.
fn task_ctx(&self) -> Arc<TaskContext>;

/// Get the session state.
fn session_state(&self) -> SessionState;

/// Register an object store with the session context.
fn register_object_store(
&self,
url: &url::Url,
object_store: Arc<dyn ObjectStore>,
) -> Option<Arc<dyn ObjectStore + 'static>>;

/// Register table options extension from scheme.
fn register_table_options_extension_from_scheme(&self, scheme: &str);

/// Execute a logical plan and return a DataFrame.
async fn execute_logical_plan(
&self,
plan: LogicalPlan,
) -> Result<DataFrame, DataFusionError>;
}

#[async_trait::async_trait]
impl CliSessionContext for SessionContext {
fn task_ctx(&self) -> Arc<TaskContext> {
self.task_ctx()
}

fn session_state(&self) -> SessionState {
self.state()
}

fn register_object_store(
&self,
url: &url::Url,
object_store: Arc<dyn ObjectStore>,
) -> Option<Arc<dyn ObjectStore + 'static>> {
self.register_object_store(url, object_store)
}

fn register_table_options_extension_from_scheme(&self, scheme: &str) {
match scheme {
// For Amazon S3 or Alibaba Cloud OSS
"s3" | "oss" | "cos" => {
// Register AWS specific table options in the session context:
self.register_table_options_extension(AwsOptions::default())
}
// For Google Cloud Storage
"gs" | "gcs" => {
// Register GCP specific table options in the session context:
self.register_table_options_extension(GcpOptions::default())
}
// For unsupported schemes, do nothing:
_ => {}
}
}

async fn execute_logical_plan(
&self,
plan: LogicalPlan,
) -> Result<DataFrame, DataFusionError> {
self.execute_logical_plan(plan).await
}
}
4 changes: 2 additions & 2 deletions datafusion-cli/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Command within CLI
use crate::cli_context::CliSessionContext;
use crate::exec::{exec_and_print, exec_from_lines};
use crate::functions::{display_all_functions, Function};
use crate::print_format::PrintFormat;
Expand All @@ -28,7 +29,6 @@ use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::exec_err;
use datafusion::common::instant::Instant;
use datafusion::error::{DataFusionError, Result};
use datafusion::prelude::SessionContext;
use std::fs::File;
use std::io::BufReader;
use std::str::FromStr;
Expand All @@ -55,7 +55,7 @@ pub enum OutputFormat {
impl Command {
pub async fn execute(
&self,
ctx: &mut SessionContext,
ctx: &mut dyn CliSessionContext,
print_options: &mut PrintOptions,
) -> Result<()> {
match self {
Expand Down
Loading

0 comments on commit ce9ba8a

Please sign in to comment.