diff --git a/.github/workflows/Rest.yml b/.github/workflows/Rest.yml index 11560e9..cb30450 100644 --- a/.github/workflows/Rest.yml +++ b/.github/workflows/Rest.yml @@ -50,10 +50,6 @@ jobs: run: | make release - - name: Start Rest Catalog - run: | - make start-rest-catalog - - name: Generate data run: | make data diff --git a/CMakeLists.txt b/CMakeLists.txt index 37fb76f..0f27055 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -18,6 +18,7 @@ set(EXTENSION_SOURCES src/common/utils.cpp src/common/schema.cpp src/common/iceberg.cpp + src/iceberg_functions/iceberg_multi_file_reader.cpp src/iceberg_functions/iceberg_snapshots.cpp src/iceberg_functions/iceberg_scan.cpp src/iceberg_functions/iceberg_metadata.cpp diff --git a/duckdb b/duckdb index f99785b..5f5512b 160000 --- a/duckdb +++ b/duckdb @@ -1 +1 @@ -Subproject commit f99785b78ae4724b31d9b41435ad8c17e57ee8f4 +Subproject commit 5f5512b827df6397afd31daedb4bbdee76520019 diff --git a/extension-ci-tools b/extension-ci-tools index 0cd9d80..f5594c6 160000 --- a/extension-ci-tools +++ b/extension-ci-tools @@ -1 +1 @@ -Subproject commit 0cd9d804a40f256e90170c1c3c9922734e198e3f +Subproject commit f5594c61803daee122a5245afb817966e1a4545c diff --git a/extension_config.cmake b/extension_config.cmake index e29924d..4490508 100644 --- a/extension_config.cmake +++ b/extension_config.cmake @@ -6,4 +6,4 @@ duckdb_extension_load(iceberg LOAD_TESTS ) -duckdb_extension_load(tpch) \ No newline at end of file +duckdb_extension_load(tpch) diff --git a/scripts/data_generators/generate_spark_local/lineitem_001_deletes/lineitem_001_deletes/q00.sql b/scripts/data_generators/generate_spark_local/lineitem_001_deletes/lineitem_001_deletes/q00.sql new file mode 100644 index 0000000..959157d --- /dev/null +++ b/scripts/data_generators/generate_spark_local/lineitem_001_deletes/lineitem_001_deletes/q00.sql @@ -0,0 +1,6 @@ +CREATE or REPLACE TABLE iceberg_catalog.lineitem_001_deletes + TBLPROPERTIES ( + 'format-version'='2', + 'write.update.mode'='merge-on-read' + ) +AS SELECT * FROM parquet_file_view; \ No newline at end of file diff --git a/scripts/data_generators/generate_spark_local/lineitem_001_deletes/lineitem_001_deletes/q01.sql b/scripts/data_generators/generate_spark_local/lineitem_001_deletes/lineitem_001_deletes/q01.sql new file mode 100644 index 0000000..2894b39 --- /dev/null +++ b/scripts/data_generators/generate_spark_local/lineitem_001_deletes/lineitem_001_deletes/q01.sql @@ -0,0 +1,11 @@ +update iceberg_catalog.lineitem_001_deletes +set l_orderkey=NULL, + l_partkey=NULL, + l_suppkey=NULL, + l_linenumber=NULL, + l_quantity=NULL, + l_extendedprice=NULL, + l_discount=NULL, + l_shipdate=NULL, + l_comment=NULL +where l_partkey % 2 = 0; \ No newline at end of file diff --git a/scripts/data_generators/generate_spark_local/lineitem_001_deletes/lineitem_001_deletes/setup.py b/scripts/data_generators/generate_spark_local/lineitem_001_deletes/lineitem_001_deletes/setup.py new file mode 100644 index 0000000..df29dff --- /dev/null +++ b/scripts/data_generators/generate_spark_local/lineitem_001_deletes/lineitem_001_deletes/setup.py @@ -0,0 +1,8 @@ +import duckdb +import os + +PARQUET_SRC_FILE = os.getenv('PARQUET_SRC_FILE') + +duckdb_con = duckdb.connect() +duckdb_con.execute("call dbgen(sf=0.01)") +duckdb_con.execute(f"copy lineitem to '{PARQUET_SRC_FILE}' (FORMAT PARQUET)") \ No newline at end of file diff --git a/scripts/data_generators/generate_spark_local/lineitem_001_deletes/q00.sql b/scripts/data_generators/generate_spark_local/lineitem_001_deletes/q00.sql new file mode 100644 index 0000000..959157d --- /dev/null +++ b/scripts/data_generators/generate_spark_local/lineitem_001_deletes/q00.sql @@ -0,0 +1,6 @@ +CREATE or REPLACE TABLE iceberg_catalog.lineitem_001_deletes + TBLPROPERTIES ( + 'format-version'='2', + 'write.update.mode'='merge-on-read' + ) +AS SELECT * FROM parquet_file_view; \ No newline at end of file diff --git a/scripts/data_generators/generate_spark_local/lineitem_001_deletes/q01.sql b/scripts/data_generators/generate_spark_local/lineitem_001_deletes/q01.sql new file mode 100644 index 0000000..2894b39 --- /dev/null +++ b/scripts/data_generators/generate_spark_local/lineitem_001_deletes/q01.sql @@ -0,0 +1,11 @@ +update iceberg_catalog.lineitem_001_deletes +set l_orderkey=NULL, + l_partkey=NULL, + l_suppkey=NULL, + l_linenumber=NULL, + l_quantity=NULL, + l_extendedprice=NULL, + l_discount=NULL, + l_shipdate=NULL, + l_comment=NULL +where l_partkey % 2 = 0; \ No newline at end of file diff --git a/scripts/data_generators/generate_spark_local/lineitem_001_deletes/setup.py b/scripts/data_generators/generate_spark_local/lineitem_001_deletes/setup.py new file mode 100644 index 0000000..df29dff --- /dev/null +++ b/scripts/data_generators/generate_spark_local/lineitem_001_deletes/setup.py @@ -0,0 +1,8 @@ +import duckdb +import os + +PARQUET_SRC_FILE = os.getenv('PARQUET_SRC_FILE') + +duckdb_con = duckdb.connect() +duckdb_con.execute("call dbgen(sf=0.01)") +duckdb_con.execute(f"copy lineitem to '{PARQUET_SRC_FILE}' (FORMAT PARQUET)") \ No newline at end of file diff --git a/scripts/data_generators/generate_spark_local/lineitem_partitioned_l_shipmode/q00.sql b/scripts/data_generators/generate_spark_local/lineitem_partitioned_l_shipmode/q00.sql new file mode 100644 index 0000000..58388c9 --- /dev/null +++ b/scripts/data_generators/generate_spark_local/lineitem_partitioned_l_shipmode/q00.sql @@ -0,0 +1,8 @@ +CREATE OR REPLACE TABLE iceberg_catalog.lineitem_partitioned_l_shipmode +USING iceberg +PARTITIONED BY (l_shipmode) +TBLPROPERTIES ( + 'format-version'='2', + 'write.update.mode'='merge-on-read' +) +as select * from parquet_file_view; diff --git a/scripts/data_generators/generate_spark_local/lineitem_partitioned_l_shipmode/q01.sql b/scripts/data_generators/generate_spark_local/lineitem_partitioned_l_shipmode/q01.sql new file mode 100644 index 0000000..8a5e90b --- /dev/null +++ b/scripts/data_generators/generate_spark_local/lineitem_partitioned_l_shipmode/q01.sql @@ -0,0 +1 @@ +delete from iceberg_catalog.lineitem_partitioned_l_shipmode where l_shipmode = 'TRUCK'; \ No newline at end of file diff --git a/scripts/data_generators/generate_spark_local/lineitem_partitioned_l_shipmode/setup.py b/scripts/data_generators/generate_spark_local/lineitem_partitioned_l_shipmode/setup.py new file mode 100644 index 0000000..df29dff --- /dev/null +++ b/scripts/data_generators/generate_spark_local/lineitem_partitioned_l_shipmode/setup.py @@ -0,0 +1,8 @@ +import duckdb +import os + +PARQUET_SRC_FILE = os.getenv('PARQUET_SRC_FILE') + +duckdb_con = duckdb.connect() +duckdb_con.execute("call dbgen(sf=0.01)") +duckdb_con.execute(f"copy lineitem to '{PARQUET_SRC_FILE}' (FORMAT PARQUET)") \ No newline at end of file diff --git a/scripts/data_generators/generate_spark_local/lineitem_partitioned_l_shipmode_deletes/q00.sql b/scripts/data_generators/generate_spark_local/lineitem_partitioned_l_shipmode_deletes/q00.sql new file mode 100644 index 0000000..dc7d4b0 --- /dev/null +++ b/scripts/data_generators/generate_spark_local/lineitem_partitioned_l_shipmode_deletes/q00.sql @@ -0,0 +1,8 @@ +CREATE OR REPLACE TABLE iceberg_catalog.lineitem_partitioned_l_shipmode_deletes +USING iceberg +PARTITIONED BY (l_shipmode) +TBLPROPERTIES ( + 'format-version'='2', + 'write.update.mode'='merge-on-read' +) +as select * from parquet_file_view; diff --git a/scripts/data_generators/generate_spark_local/lineitem_partitioned_l_shipmode_deletes/q01.sql b/scripts/data_generators/generate_spark_local/lineitem_partitioned_l_shipmode_deletes/q01.sql new file mode 100644 index 0000000..9d28acf --- /dev/null +++ b/scripts/data_generators/generate_spark_local/lineitem_partitioned_l_shipmode_deletes/q01.sql @@ -0,0 +1,6 @@ +UPDATE iceberg_catalog.lineitem_partitioned_l_shipmode_deletes +Set l_comment=NULL, + l_quantity=NULL, + l_discount=NULL, + l_linestatus=NULL +where l_linenumber = 3 or l_linenumber = 4 or l_linenumber = 5; \ No newline at end of file diff --git a/scripts/data_generators/generate_spark_local/lineitem_partitioned_l_shipmode_deletes/setup.py b/scripts/data_generators/generate_spark_local/lineitem_partitioned_l_shipmode_deletes/setup.py new file mode 100644 index 0000000..df29dff --- /dev/null +++ b/scripts/data_generators/generate_spark_local/lineitem_partitioned_l_shipmode_deletes/setup.py @@ -0,0 +1,8 @@ +import duckdb +import os + +PARQUET_SRC_FILE = os.getenv('PARQUET_SRC_FILE') + +duckdb_con = duckdb.connect() +duckdb_con.execute("call dbgen(sf=0.01)") +duckdb_con.execute(f"copy lineitem to '{PARQUET_SRC_FILE}' (FORMAT PARQUET)") \ No newline at end of file diff --git a/scripts/data_generators/generate_spark_local/lineitem_sf1_deletes/q00.sql b/scripts/data_generators/generate_spark_local/lineitem_sf1_deletes/q00.sql new file mode 100644 index 0000000..d9eae47 --- /dev/null +++ b/scripts/data_generators/generate_spark_local/lineitem_sf1_deletes/q00.sql @@ -0,0 +1,6 @@ +CREATE or REPLACE TABLE iceberg_catalog.lineitem_sf1_deletes + TBLPROPERTIES ( + 'format-version'='2', + 'write.update.mode'='merge-on-read' + ) +AS SELECT * FROM parquet_file_view; \ No newline at end of file diff --git a/scripts/data_generators/generate_spark_local/lineitem_sf1_deletes/q01.sql b/scripts/data_generators/generate_spark_local/lineitem_sf1_deletes/q01.sql new file mode 100644 index 0000000..6b28e70 --- /dev/null +++ b/scripts/data_generators/generate_spark_local/lineitem_sf1_deletes/q01.sql @@ -0,0 +1,11 @@ +update iceberg_catalog.lineitem_sf1_deletes +set l_orderkey=NULL, + l_partkey=NULL, + l_suppkey=NULL, + l_linenumber=NULL, + l_quantity=NULL, + l_extendedprice=NULL, + l_discount=NULL, + l_shipdate=NULL, + l_comment=NULL +where l_partkey % 2 = 0; \ No newline at end of file diff --git a/scripts/data_generators/generate_spark_local/lineitem_sf1_deletes/setup.py b/scripts/data_generators/generate_spark_local/lineitem_sf1_deletes/setup.py new file mode 100644 index 0000000..6fc4355 --- /dev/null +++ b/scripts/data_generators/generate_spark_local/lineitem_sf1_deletes/setup.py @@ -0,0 +1,8 @@ +import duckdb +import os + +PARQUET_SRC_FILE = os.getenv('PARQUET_SRC_FILE') + +duckdb_con = duckdb.connect() +duckdb_con.execute("call dbgen(sf=1)") +duckdb_con.execute(f"copy lineitem to '{PARQUET_SRC_FILE}' (FORMAT PARQUET)") \ No newline at end of file diff --git a/scripts/data_generators/generate_spark_local/lineitem_sf_01_1_delete/q00.sql b/scripts/data_generators/generate_spark_local/lineitem_sf_01_1_delete/q00.sql new file mode 100644 index 0000000..96c1d14 --- /dev/null +++ b/scripts/data_generators/generate_spark_local/lineitem_sf_01_1_delete/q00.sql @@ -0,0 +1,6 @@ +CREATE or REPLACE TABLE iceberg_catalog.lineitem_sf_01_1_delete + TBLPROPERTIES ( + 'format-version'='2', + 'write.update.mode'='merge-on-read' + ) +AS SELECT * FROM parquet_file_view; \ No newline at end of file diff --git a/scripts/data_generators/generate_spark_local/lineitem_sf_01_1_delete/q01.sql b/scripts/data_generators/generate_spark_local/lineitem_sf_01_1_delete/q01.sql new file mode 100644 index 0000000..ecb6a3f --- /dev/null +++ b/scripts/data_generators/generate_spark_local/lineitem_sf_01_1_delete/q01.sql @@ -0,0 +1 @@ +delete from iceberg_catalog.lineitem_sf_01_1_delete where l_orderkey=10053 and l_partkey = 77; \ No newline at end of file diff --git a/scripts/data_generators/generate_spark_local/lineitem_sf_01_1_delete/setup.py b/scripts/data_generators/generate_spark_local/lineitem_sf_01_1_delete/setup.py new file mode 100644 index 0000000..df29dff --- /dev/null +++ b/scripts/data_generators/generate_spark_local/lineitem_sf_01_1_delete/setup.py @@ -0,0 +1,8 @@ +import duckdb +import os + +PARQUET_SRC_FILE = os.getenv('PARQUET_SRC_FILE') + +duckdb_con = duckdb.connect() +duckdb_con.execute("call dbgen(sf=0.01)") +duckdb_con.execute(f"copy lineitem to '{PARQUET_SRC_FILE}' (FORMAT PARQUET)") \ No newline at end of file diff --git a/scripts/data_generators/generate_spark_local/lineitem_sf_01_no_deletes/q00.sql b/scripts/data_generators/generate_spark_local/lineitem_sf_01_no_deletes/q00.sql new file mode 100644 index 0000000..9e02481 --- /dev/null +++ b/scripts/data_generators/generate_spark_local/lineitem_sf_01_no_deletes/q00.sql @@ -0,0 +1,6 @@ +CREATE or REPLACE TABLE iceberg_catalog.lineitem_sf_01_no_deletes + TBLPROPERTIES ( + 'format-version'='2', + 'write.update.mode'='merge-on-read' + ) +AS SELECT * FROM parquet_file_view; \ No newline at end of file diff --git a/scripts/data_generators/generate_spark_local/lineitem_sf_01_no_deletes/setup.py b/scripts/data_generators/generate_spark_local/lineitem_sf_01_no_deletes/setup.py new file mode 100644 index 0000000..df29dff --- /dev/null +++ b/scripts/data_generators/generate_spark_local/lineitem_sf_01_no_deletes/setup.py @@ -0,0 +1,8 @@ +import duckdb +import os + +PARQUET_SRC_FILE = os.getenv('PARQUET_SRC_FILE') + +duckdb_con = duckdb.connect() +duckdb_con.execute("call dbgen(sf=0.01)") +duckdb_con.execute(f"copy lineitem to '{PARQUET_SRC_FILE}' (FORMAT PARQUET)") \ No newline at end of file diff --git a/scripts/data_generators/generate_spark_local/table_with_deletes/q00.sql b/scripts/data_generators/generate_spark_local/table_with_deletes/q00.sql new file mode 100644 index 0000000..bfb84c2 --- /dev/null +++ b/scripts/data_generators/generate_spark_local/table_with_deletes/q00.sql @@ -0,0 +1,6 @@ +CREATE or REPLACE TABLE iceberg_catalog.table_with_deletes + TBLPROPERTIES ( + 'format-version'='2', + 'write.update.mode'='merge-on-read' + ) +AS SELECT * FROM parquet_file_view; \ No newline at end of file diff --git a/scripts/data_generators/generate_spark_local/table_with_deletes/q01.sql b/scripts/data_generators/generate_spark_local/table_with_deletes/q01.sql new file mode 100644 index 0000000..6e73313 --- /dev/null +++ b/scripts/data_generators/generate_spark_local/table_with_deletes/q01.sql @@ -0,0 +1,11 @@ +update iceberg_catalog.table_with_deletes +set l_orderkey=NULL, + l_partkey=NULL, + l_suppkey=NULL, + l_linenumber=NULL, + l_quantity=NULL, + l_extendedprice=NULL, + l_discount=NULL, + l_shipdate=NULL, + l_comment=NULL +where l_partkey % 2 = 0; \ No newline at end of file diff --git a/scripts/data_generators/generate_spark_local/table_with_deletes/setup.py b/scripts/data_generators/generate_spark_local/table_with_deletes/setup.py new file mode 100644 index 0000000..df29dff --- /dev/null +++ b/scripts/data_generators/generate_spark_local/table_with_deletes/setup.py @@ -0,0 +1,8 @@ +import duckdb +import os + +PARQUET_SRC_FILE = os.getenv('PARQUET_SRC_FILE') + +duckdb_con = duckdb.connect() +duckdb_con.execute("call dbgen(sf=0.01)") +duckdb_con.execute(f"copy lineitem to '{PARQUET_SRC_FILE}' (FORMAT PARQUET)") \ No newline at end of file diff --git a/scripts/data_generators/generate_spark_rest/lineitem_001_deletes/q00.sql b/scripts/data_generators/generate_spark_rest/lineitem_001_deletes/q00.sql new file mode 100644 index 0000000..c581d86 --- /dev/null +++ b/scripts/data_generators/generate_spark_rest/lineitem_001_deletes/q00.sql @@ -0,0 +1,6 @@ +CREATE or REPLACE TABLE default.lineitem_001_deletes + TBLPROPERTIES ( + 'format-version'='2', + 'write.update.mode'='merge-on-read' + ) +AS SELECT * FROM parquet_file_view; \ No newline at end of file diff --git a/scripts/data_generators/generate_spark_rest/lineitem_001_deletes/q01.sql b/scripts/data_generators/generate_spark_rest/lineitem_001_deletes/q01.sql new file mode 100644 index 0000000..472849d --- /dev/null +++ b/scripts/data_generators/generate_spark_rest/lineitem_001_deletes/q01.sql @@ -0,0 +1,11 @@ +update default.lineitem_001_deletes +set l_orderkey=NULL, + l_partkey=NULL, + l_suppkey=NULL, + l_linenumber=NULL, + l_quantity=NULL, + l_extendedprice=NULL, + l_discount=NULL, + l_shipdate=NULL, + l_comment=NULL +where l_partkey % 2 = 0; \ No newline at end of file diff --git a/scripts/data_generators/generate_spark_rest/lineitem_001_deletes/setup.py b/scripts/data_generators/generate_spark_rest/lineitem_001_deletes/setup.py new file mode 100644 index 0000000..df29dff --- /dev/null +++ b/scripts/data_generators/generate_spark_rest/lineitem_001_deletes/setup.py @@ -0,0 +1,8 @@ +import duckdb +import os + +PARQUET_SRC_FILE = os.getenv('PARQUET_SRC_FILE') + +duckdb_con = duckdb.connect() +duckdb_con.execute("call dbgen(sf=0.01)") +duckdb_con.execute(f"copy lineitem to '{PARQUET_SRC_FILE}' (FORMAT PARQUET)") \ No newline at end of file diff --git a/scripts/data_generators/generate_spark_rest/lineitem_partitioned_l_shipmode/q00.sql b/scripts/data_generators/generate_spark_rest/lineitem_partitioned_l_shipmode/q00.sql new file mode 100644 index 0000000..fa22eb3 --- /dev/null +++ b/scripts/data_generators/generate_spark_rest/lineitem_partitioned_l_shipmode/q00.sql @@ -0,0 +1,8 @@ +CREATE OR REPLACE TABLE default.lineitem_partitioned_l_shipmode +USING iceberg +PARTITIONED BY (l_shipmode) +TBLPROPERTIES ( + 'format-version'='2', + 'write.update.mode'='merge-on-read' +) +as select * from parquet_file_view; diff --git a/scripts/data_generators/generate_spark_rest/lineitem_partitioned_l_shipmode/q01.sql b/scripts/data_generators/generate_spark_rest/lineitem_partitioned_l_shipmode/q01.sql new file mode 100644 index 0000000..c7047b5 --- /dev/null +++ b/scripts/data_generators/generate_spark_rest/lineitem_partitioned_l_shipmode/q01.sql @@ -0,0 +1 @@ +delete from default.lineitem_partitioned_l_shipmode where l_shipmode = 'TRUCK'; \ No newline at end of file diff --git a/scripts/data_generators/generate_spark_rest/lineitem_partitioned_l_shipmode/setup.py b/scripts/data_generators/generate_spark_rest/lineitem_partitioned_l_shipmode/setup.py new file mode 100644 index 0000000..df29dff --- /dev/null +++ b/scripts/data_generators/generate_spark_rest/lineitem_partitioned_l_shipmode/setup.py @@ -0,0 +1,8 @@ +import duckdb +import os + +PARQUET_SRC_FILE = os.getenv('PARQUET_SRC_FILE') + +duckdb_con = duckdb.connect() +duckdb_con.execute("call dbgen(sf=0.01)") +duckdb_con.execute(f"copy lineitem to '{PARQUET_SRC_FILE}' (FORMAT PARQUET)") \ No newline at end of file diff --git a/scripts/data_generators/generate_spark_rest/lineitem_partitioned_l_shipmode_deletes/q00.sql b/scripts/data_generators/generate_spark_rest/lineitem_partitioned_l_shipmode_deletes/q00.sql new file mode 100644 index 0000000..e1efd19 --- /dev/null +++ b/scripts/data_generators/generate_spark_rest/lineitem_partitioned_l_shipmode_deletes/q00.sql @@ -0,0 +1,8 @@ +CREATE OR REPLACE TABLE default.lineitem_partitioned_l_shipmode_deletes +USING iceberg +PARTITIONED BY (l_shipmode) +TBLPROPERTIES ( + 'format-version'='2', + 'write.update.mode'='merge-on-read' +) +as select * from parquet_file_view; diff --git a/scripts/data_generators/generate_spark_rest/lineitem_partitioned_l_shipmode_deletes/q01.sql b/scripts/data_generators/generate_spark_rest/lineitem_partitioned_l_shipmode_deletes/q01.sql new file mode 100644 index 0000000..8116388 --- /dev/null +++ b/scripts/data_generators/generate_spark_rest/lineitem_partitioned_l_shipmode_deletes/q01.sql @@ -0,0 +1,6 @@ +UPDATE default.lineitem_partitioned_l_shipmode_deletes +Set l_comment=NULL, + l_quantity=NULL, + l_discount=NULL, + l_linestatus=NULL +where l_linenumber = 3 or l_linenumber = 4 or l_linenumber = 5; \ No newline at end of file diff --git a/scripts/data_generators/generate_spark_rest/lineitem_partitioned_l_shipmode_deletes/setup.py b/scripts/data_generators/generate_spark_rest/lineitem_partitioned_l_shipmode_deletes/setup.py new file mode 100644 index 0000000..df29dff --- /dev/null +++ b/scripts/data_generators/generate_spark_rest/lineitem_partitioned_l_shipmode_deletes/setup.py @@ -0,0 +1,8 @@ +import duckdb +import os + +PARQUET_SRC_FILE = os.getenv('PARQUET_SRC_FILE') + +duckdb_con = duckdb.connect() +duckdb_con.execute("call dbgen(sf=0.01)") +duckdb_con.execute(f"copy lineitem to '{PARQUET_SRC_FILE}' (FORMAT PARQUET)") \ No newline at end of file diff --git a/scripts/data_generators/generate_spark_rest/lineitem_sf1_deletes/q00.sql b/scripts/data_generators/generate_spark_rest/lineitem_sf1_deletes/q00.sql new file mode 100644 index 0000000..4112dad --- /dev/null +++ b/scripts/data_generators/generate_spark_rest/lineitem_sf1_deletes/q00.sql @@ -0,0 +1,6 @@ +CREATE or REPLACE TABLE default.lineitem_sf1_deletes + TBLPROPERTIES ( + 'format-version'='2', + 'write.update.mode'='merge-on-read' + ) +AS SELECT * FROM parquet_file_view; \ No newline at end of file diff --git a/scripts/data_generators/generate_spark_rest/lineitem_sf1_deletes/q01.sql b/scripts/data_generators/generate_spark_rest/lineitem_sf1_deletes/q01.sql new file mode 100644 index 0000000..4f789bd --- /dev/null +++ b/scripts/data_generators/generate_spark_rest/lineitem_sf1_deletes/q01.sql @@ -0,0 +1,11 @@ +update default.lineitem_sf1_deletes +set l_orderkey=NULL, + l_partkey=NULL, + l_suppkey=NULL, + l_linenumber=NULL, + l_quantity=NULL, + l_extendedprice=NULL, + l_discount=NULL, + l_shipdate=NULL, + l_comment=NULL +where l_partkey % 2 = 0; \ No newline at end of file diff --git a/scripts/data_generators/generate_spark_rest/lineitem_sf1_deletes/setup.py b/scripts/data_generators/generate_spark_rest/lineitem_sf1_deletes/setup.py new file mode 100644 index 0000000..6fc4355 --- /dev/null +++ b/scripts/data_generators/generate_spark_rest/lineitem_sf1_deletes/setup.py @@ -0,0 +1,8 @@ +import duckdb +import os + +PARQUET_SRC_FILE = os.getenv('PARQUET_SRC_FILE') + +duckdb_con = duckdb.connect() +duckdb_con.execute("call dbgen(sf=1)") +duckdb_con.execute(f"copy lineitem to '{PARQUET_SRC_FILE}' (FORMAT PARQUET)") \ No newline at end of file diff --git a/scripts/data_generators/generate_spark_rest/lineitem_sf_01_1_delete/q00.sql b/scripts/data_generators/generate_spark_rest/lineitem_sf_01_1_delete/q00.sql new file mode 100644 index 0000000..7893470 --- /dev/null +++ b/scripts/data_generators/generate_spark_rest/lineitem_sf_01_1_delete/q00.sql @@ -0,0 +1,6 @@ +CREATE or REPLACE TABLE default.lineitem_sf_01_1_delete + TBLPROPERTIES ( + 'format-version'='2', + 'write.update.mode'='merge-on-read' + ) +AS SELECT * FROM parquet_file_view; \ No newline at end of file diff --git a/scripts/data_generators/generate_spark_rest/lineitem_sf_01_1_delete/q01.sql b/scripts/data_generators/generate_spark_rest/lineitem_sf_01_1_delete/q01.sql new file mode 100644 index 0000000..8fcc58e --- /dev/null +++ b/scripts/data_generators/generate_spark_rest/lineitem_sf_01_1_delete/q01.sql @@ -0,0 +1 @@ +delete from default.lineitem_sf_01_1_delete where l_orderkey=10053 and l_partkey = 77; \ No newline at end of file diff --git a/scripts/data_generators/generate_spark_rest/lineitem_sf_01_1_delete/setup.py b/scripts/data_generators/generate_spark_rest/lineitem_sf_01_1_delete/setup.py new file mode 100644 index 0000000..df29dff --- /dev/null +++ b/scripts/data_generators/generate_spark_rest/lineitem_sf_01_1_delete/setup.py @@ -0,0 +1,8 @@ +import duckdb +import os + +PARQUET_SRC_FILE = os.getenv('PARQUET_SRC_FILE') + +duckdb_con = duckdb.connect() +duckdb_con.execute("call dbgen(sf=0.01)") +duckdb_con.execute(f"copy lineitem to '{PARQUET_SRC_FILE}' (FORMAT PARQUET)") \ No newline at end of file diff --git a/scripts/data_generators/generate_spark_rest/lineitem_sf_01_no_deletes/q00.sql b/scripts/data_generators/generate_spark_rest/lineitem_sf_01_no_deletes/q00.sql new file mode 100644 index 0000000..b21fa20 --- /dev/null +++ b/scripts/data_generators/generate_spark_rest/lineitem_sf_01_no_deletes/q00.sql @@ -0,0 +1,6 @@ +CREATE or REPLACE TABLE default.lineitem_sf_01_no_deletes + TBLPROPERTIES ( + 'format-version'='2', + 'write.update.mode'='merge-on-read' + ) +AS SELECT * FROM parquet_file_view; \ No newline at end of file diff --git a/scripts/data_generators/generate_spark_rest/lineitem_sf_01_no_deletes/setup.py b/scripts/data_generators/generate_spark_rest/lineitem_sf_01_no_deletes/setup.py new file mode 100644 index 0000000..df29dff --- /dev/null +++ b/scripts/data_generators/generate_spark_rest/lineitem_sf_01_no_deletes/setup.py @@ -0,0 +1,8 @@ +import duckdb +import os + +PARQUET_SRC_FILE = os.getenv('PARQUET_SRC_FILE') + +duckdb_con = duckdb.connect() +duckdb_con.execute("call dbgen(sf=0.01)") +duckdb_con.execute(f"copy lineitem to '{PARQUET_SRC_FILE}' (FORMAT PARQUET)") \ No newline at end of file diff --git a/scripts/data_generators/tmp_data/tmp.parquet b/scripts/data_generators/tmp_data/tmp.parquet index 5475f1e..3600bba 100644 Binary files a/scripts/data_generators/tmp_data/tmp.parquet and b/scripts/data_generators/tmp_data/tmp.parquet differ diff --git a/src/common/iceberg.cpp b/src/common/iceberg.cpp index 2d224d5..90fa7b8 100644 --- a/src/common/iceberg.cpp +++ b/src/common/iceberg.cpp @@ -10,26 +10,42 @@ #include "avro/ValidSchema.hh" #include "avro/Stream.hh" +#include "manifest_reader.hpp" + namespace duckdb { -IcebergTable IcebergTable::Load(const string &iceberg_path, IcebergSnapshot &snapshot, FileSystem &fs, - bool allow_moved_paths, string metadata_compression_codec) { +IcebergTable IcebergTable::Load(const string &iceberg_path, IcebergSnapshot &snapshot, FileSystem &fs, const IcebergOptions &options) { IcebergTable ret; ret.path = iceberg_path; ret.snapshot = snapshot; - auto manifest_list_full_path = allow_moved_paths - ? IcebergUtils::GetFullPath(iceberg_path, snapshot.manifest_list, fs) - : snapshot.manifest_list; - auto manifests = ReadManifestListFile(manifest_list_full_path, fs, snapshot.iceberg_format_version); - - for (auto &manifest : manifests) { - auto manifest_entry_full_path = allow_moved_paths - ? IcebergUtils::GetFullPath(iceberg_path, manifest.manifest_path, fs) - : manifest.manifest_path; - auto manifest_paths = ReadManifestEntries(manifest_entry_full_path, fs, snapshot.iceberg_format_version); + unique_ptr manifest_reader; + unique_ptr manifest_entry_reader; + if (snapshot.iceberg_format_version == 1) { + manifest_entry_reader = make_uniq(iceberg_path, snapshot.manifest_list, fs, options); + manifest_reader = make_uniq(iceberg_path, snapshot.manifest_list, fs, options); + } else if (snapshot.iceberg_format_version == 2) { + manifest_entry_reader = make_uniq(iceberg_path, snapshot.manifest_list, fs, options); + manifest_reader = make_uniq(iceberg_path, snapshot.manifest_list, fs, options); + } else { + throw InvalidInputException("Reading from Iceberg version %d is not supported yet", snapshot.iceberg_format_version); + } - ret.entries.push_back({std::move(manifest), std::move(manifest_paths)}); + while (!manifest_reader->Finished()) { + auto manifest = manifest_reader->GetNext(); + if (!manifest) { + break; + } + auto state = manifest_entry_reader->InitializeScan(*manifest); + vector manifest_paths; + while (!state.finished) { + auto new_entry = manifest_entry_reader->GetNext(state); + if (!new_entry) { + break; + } + manifest_paths.push_back(std::move(*new_entry)); + } + ret.entries.push_back({std::move(*manifest), std::move(manifest_paths)}); } return ret; @@ -118,7 +134,7 @@ unique_ptr IcebergSnapshot::GetParseInfo(yyjson_doc &metadata return make_uniq(std::move(info)); } -unique_ptr IcebergSnapshot::GetParseInfo(const string &path, FileSystem &fs, string metadata_compression_codec) { +unique_ptr IcebergSnapshot::GetParseInfo(const string &path, FileSystem &fs, const string &metadata_compression_codec) { auto metadata_json = ReadMetaData(path, fs, metadata_compression_codec); auto* doc = yyjson_read(metadata_json.c_str(), metadata_json.size(), 0); if (doc == nullptr) { @@ -133,53 +149,49 @@ unique_ptr IcebergSnapshot::GetParseInfo(const string &path, return parse_info; } -IcebergSnapshot IcebergSnapshot::GetLatestSnapshot(const string &path, FileSystem &fs, - string metadata_compression_codec, bool skip_schema_inference) { - auto info = GetParseInfo(path, fs, metadata_compression_codec); +IcebergSnapshot IcebergSnapshot::GetLatestSnapshot(const string &path, FileSystem &fs, const IcebergOptions &options) { + auto info = GetParseInfo(path, fs, options.metadata_compression_codec); auto latest_snapshot = FindLatestSnapshotInternal(info->snapshots); if (!latest_snapshot) { throw IOException("No snapshots found"); } - return ParseSnapShot(latest_snapshot, info->iceberg_version, info->schema_id, info->schemas, metadata_compression_codec, skip_schema_inference); + return ParseSnapShot(latest_snapshot, info->iceberg_version, info->schema_id, info->schemas, options); } -IcebergSnapshot IcebergSnapshot::GetSnapshotById(const string &path, FileSystem &fs, idx_t snapshot_id, - string metadata_compression_codec, bool skip_schema_inference) { - auto info = GetParseInfo(path, fs, metadata_compression_codec); +IcebergSnapshot IcebergSnapshot::GetSnapshotById(const string &path, FileSystem &fs, idx_t snapshot_id, const IcebergOptions &options) { + auto info = GetParseInfo(path, fs, options.metadata_compression_codec); auto snapshot = FindSnapshotByIdInternal(info->snapshots, snapshot_id); if (!snapshot) { throw IOException("Could not find snapshot with id " + to_string(snapshot_id)); } - return ParseSnapShot(snapshot, info->iceberg_version, info->schema_id, info->schemas, - metadata_compression_codec, skip_schema_inference); + return ParseSnapShot(snapshot, info->iceberg_version, info->schema_id, info->schemas, options); } -IcebergSnapshot IcebergSnapshot::GetSnapshotByTimestamp(const string &path, FileSystem &fs, timestamp_t timestamp, string metadata_compression_codec, - bool skip_schema_inference) { - auto info = GetParseInfo(path, fs, metadata_compression_codec); +IcebergSnapshot IcebergSnapshot::GetSnapshotByTimestamp(const string &path, FileSystem &fs, timestamp_t timestamp, const IcebergOptions &options) { + auto info = GetParseInfo(path, fs, options.metadata_compression_codec); auto snapshot = FindSnapshotByIdTimestampInternal(info->snapshots, timestamp); if (!snapshot) { throw IOException("Could not find latest snapshots for timestamp " + Timestamp::ToString(timestamp)); } - return ParseSnapShot(snapshot, info->iceberg_version, info->schema_id, info->schemas, metadata_compression_codec, skip_schema_inference); + return ParseSnapShot(snapshot, info->iceberg_version, info->schema_id, info->schemas, options); } // Function to generate a metadata file url from version and format string // default format is "v%s%s.metadata.json" -> v00###-xxxxxxxxx-.gz.metadata.json" -string GenerateMetaDataUrl(FileSystem &fs, const string &meta_path, string &table_version, string &metadata_compression_codec, string &version_format = DEFAULT_TABLE_VERSION_FORMAT) { +string GenerateMetaDataUrl(FileSystem &fs, const string &meta_path, string &table_version, const IcebergOptions &options) { // TODO: Need to URL Encode table_version string compression_suffix = ""; string url; - if (metadata_compression_codec == "gzip") { + if (options.metadata_compression_codec == "gzip") { compression_suffix = ".gz"; } - for(auto try_format : StringUtil::Split(version_format, ',')) { + for(auto try_format : StringUtil::Split(options.version_name_format, ',')) { url = fs.JoinPath(meta_path, StringUtil::Format(try_format, table_version, compression_suffix)); if(fs.FileExists(url)) { return url; @@ -187,13 +199,16 @@ string GenerateMetaDataUrl(FileSystem &fs, const string &meta_path, string &tabl } throw IOException( - "Iceberg metadata file not found for table version '%s' using '%s' compression and format(s): '%s'", table_version, metadata_compression_codec, version_format); + "Iceberg metadata file not found for table version '%s' using '%s' compression and format(s): '%s'", table_version, options.metadata_compression_codec, options.version_name_format); } -string IcebergSnapshot::GetMetaDataPath(ClientContext &context, const string &path, FileSystem &fs, string metadata_compression_codec, string table_version = DEFAULT_TABLE_VERSION, string version_format = DEFAULT_TABLE_VERSION_FORMAT) { +string IcebergSnapshot::GetMetaDataPath(ClientContext &context, const string &path, FileSystem &fs, const IcebergOptions &options) { string version_hint; string meta_path = fs.JoinPath(path, "metadata"); + + auto &table_version = options.table_version; + if (StringUtil::EndsWith(path, ".json")) { // We've been given a real metadata path. Nothing else to do. return path; @@ -201,17 +216,17 @@ string IcebergSnapshot::GetMetaDataPath(ClientContext &context, const string &pa if(StringUtil::EndsWith(table_version, ".text")||StringUtil::EndsWith(table_version, ".txt")) { // We were given a hint filename version_hint = GetTableVersionFromHint(meta_path, fs, table_version); - return GenerateMetaDataUrl(fs, meta_path, version_hint, metadata_compression_codec, version_format); + return GenerateMetaDataUrl(fs, meta_path, version_hint, options); } if (table_version != UNKNOWN_TABLE_VERSION) { // We were given an explicit version number version_hint = table_version; - return GenerateMetaDataUrl(fs, meta_path, version_hint, metadata_compression_codec, version_format); + return GenerateMetaDataUrl(fs, meta_path, version_hint, options); } if (fs.FileExists(fs.JoinPath(meta_path, DEFAULT_VERSION_HINT_FILE))) { // We're guessing, but a version-hint.text exists so we'll use that version_hint = GetTableVersionFromHint(meta_path, fs, DEFAULT_VERSION_HINT_FILE); - return GenerateMetaDataUrl(fs, meta_path, version_hint, metadata_compression_codec, version_format); + return GenerateMetaDataUrl(fs, meta_path, version_hint, options); } if (!UnsafeVersionGuessingEnabled(context)) { // Make sure we're allowed to guess versions @@ -219,27 +234,23 @@ string IcebergSnapshot::GetMetaDataPath(ClientContext &context, const string &pa } // We are allowed to guess to guess from file paths - return GuessTableVersion(meta_path, fs, table_version, metadata_compression_codec, version_format); + return GuessTableVersion(meta_path, fs, options); } - -string IcebergSnapshot::ReadMetaData(const string &path, FileSystem &fs, string metadata_compression_codec) { +string IcebergSnapshot::ReadMetaData(const string &path, FileSystem &fs, const string &metadata_compression_codec) { if (metadata_compression_codec == "gzip") { return IcebergUtils::GzFileToString(path, fs); } return IcebergUtils::FileToString(path, fs); } - -IcebergSnapshot IcebergSnapshot::ParseSnapShot(yyjson_val *snapshot, idx_t iceberg_format_version, idx_t schema_id, - vector &schemas, string metadata_compression_codec, - bool skip_schema_inference) { +IcebergSnapshot IcebergSnapshot::ParseSnapShot(yyjson_val *snapshot, idx_t iceberg_format_version, idx_t schema_id, vector &schemas, const IcebergOptions &options) { IcebergSnapshot ret; auto snapshot_tag = yyjson_get_type(snapshot); if (snapshot_tag != YYJSON_TYPE_OBJ) { throw IOException("Invalid snapshot field found parsing iceberg metadata.json"); } - ret.metadata_compression_codec = metadata_compression_codec; + ret.metadata_compression_codec = options.metadata_compression_codec; if (iceberg_format_version == 1) { ret.sequence_number = 0; } else if (iceberg_format_version == 2) { @@ -251,7 +262,7 @@ IcebergSnapshot IcebergSnapshot::ParseSnapShot(yyjson_val *snapshot, idx_t icebe ret.manifest_list = IcebergUtils::TryGetStrFromObject(snapshot, "manifest-list"); ret.iceberg_format_version = iceberg_format_version; ret.schema_id = schema_id; - if (!skip_schema_inference) { + if (!options.skip_schema_inference) { ret.schema = ParseSchema(schemas, ret.schema_id); } return ret; @@ -277,19 +288,21 @@ bool IcebergSnapshot::UnsafeVersionGuessingEnabled(ClientContext &context) { } -string IcebergSnapshot::GuessTableVersion(const string &meta_path, FileSystem &fs, string &table_version, string &metadata_compression_codec, string &version_format = DEFAULT_TABLE_VERSION_FORMAT) { +string IcebergSnapshot::GuessTableVersion(const string &meta_path, FileSystem &fs, const IcebergOptions &options) { string selected_metadata; string version_pattern = "*"; // TODO: Different "table_version" strings could customize this string compression_suffix = ""; - + + auto &metadata_compression_codec = options.metadata_compression_codec; + auto &version_format = options.version_name_format; if (metadata_compression_codec == "gzip") { compression_suffix = ".gz"; } - + for(auto try_format : StringUtil::Split(version_format, ',')) { auto glob_pattern = StringUtil::Format(try_format, version_pattern, compression_suffix); - + auto found_versions = fs.Glob(fs.JoinPath(meta_path, glob_pattern)); if(found_versions.size() > 0) { selected_metadata = PickTableVersion(found_versions, version_pattern, glob_pattern); @@ -298,7 +311,7 @@ string IcebergSnapshot::GuessTableVersion(const string &meta_path, FileSystem &f } } } - + throw IOException( "Could not guess Iceberg table version using '%s' compression and format(s): '%s'", metadata_compression_codec, version_format); @@ -315,7 +328,6 @@ string IcebergSnapshot::PickTableVersion(vector &found_metadata, string } } - yyjson_val *IcebergSnapshot::FindLatestSnapshotInternal(yyjson_val *snapshots) { size_t idx, max; yyjson_val *snapshot; @@ -373,4 +385,4 @@ yyjson_val *IcebergSnapshot::IcebergSnapshot::FindSnapshotByIdTimestampInternal( return max_snapshot; } -} // namespace duckdb +} // namespace duckdb \ No newline at end of file diff --git a/src/common/schema.cpp b/src/common/schema.cpp index 9645363..fe800ad 100644 --- a/src/common/schema.cpp +++ b/src/common/schema.cpp @@ -145,7 +145,7 @@ IcebergColumnDefinition IcebergColumnDefinition::ParseFromJson(yyjson_val *val) ret.id = IcebergUtils::TryGetNumFromObject(val, "id"); ret.name = IcebergUtils::TryGetStrFromObject(val, "name"); ret.type = ParseType(val); - ret.default_value = Value(); + ret.default_value = Value(ret.type); ret.required = IcebergUtils::TryGetBoolFromObject(val, "required"); return ret; diff --git a/src/iceberg_extension.cpp b/src/iceberg_extension.cpp index 38e721a..5864067 100644 --- a/src/iceberg_extension.cpp +++ b/src/iceberg_extension.cpp @@ -159,7 +159,7 @@ static void LoadInternal(DatabaseInstance &instance) { ); // Iceberg Table Functions - for (auto &fun : IcebergFunctions::GetTableFunctions()) { + for (auto &fun : IcebergFunctions::GetTableFunctions(instance)) { ExtensionUtil::RegisterFunction(instance, fun); } diff --git a/src/iceberg_functions.cpp b/src/iceberg_functions.cpp index a1e4721..d3ea52f 100644 --- a/src/iceberg_functions.cpp +++ b/src/iceberg_functions.cpp @@ -7,11 +7,11 @@ namespace duckdb { -vector IcebergFunctions::GetTableFunctions() { +vector IcebergFunctions::GetTableFunctions(DatabaseInstance &instance) { vector functions; functions.push_back(GetIcebergSnapshotsFunction()); - functions.push_back(GetIcebergScanFunction()); + functions.push_back(GetIcebergScanFunction(instance)); functions.push_back(GetIcebergMetadataFunction()); return functions; diff --git a/src/iceberg_functions/iceberg_metadata.cpp b/src/iceberg_functions/iceberg_metadata.cpp index f8c9557..ca1c29b 100644 --- a/src/iceberg_functions/iceberg_metadata.cpp +++ b/src/iceberg_functions/iceberg_metadata.cpp @@ -54,44 +54,40 @@ static unique_ptr IcebergMetaDataBind(ClientContext &context, Tabl FileSystem &fs = FileSystem::GetFileSystem(context); auto iceberg_path = input.inputs[0].ToString(); - bool allow_moved_paths = false; - string metadata_compression_codec = "none"; - bool skip_schema_inference = false; - string table_version = DEFAULT_TABLE_VERSION; - string version_name_format = DEFAULT_TABLE_VERSION_FORMAT; + IcebergOptions options; for (auto &kv : input.named_parameters) { auto loption = StringUtil::Lower(kv.first); if (loption == "allow_moved_paths") { - allow_moved_paths = BooleanValue::Get(kv.second); + options.allow_moved_paths = BooleanValue::Get(kv.second); } else if (loption == "metadata_compression_codec") { - metadata_compression_codec = StringValue::Get(kv.second); + options.metadata_compression_codec = StringValue::Get(kv.second); } else if (loption == "skip_schema_inference") { - skip_schema_inference = BooleanValue::Get(kv.second); + options.skip_schema_inference = BooleanValue::Get(kv.second); } else if (loption == "version") { - table_version = StringValue::Get(kv.second); + options.table_version = StringValue::Get(kv.second); } else if (loption == "version_name_format") { - version_name_format = StringValue::Get(kv.second); + options.version_name_format = StringValue::Get(kv.second); } } - auto iceberg_meta_path = IcebergSnapshot::GetMetaDataPath(context, iceberg_path, fs, metadata_compression_codec, table_version, version_name_format); + auto iceberg_meta_path = IcebergSnapshot::GetMetaDataPath(context, iceberg_path, fs, options); IcebergSnapshot snapshot_to_scan; if (input.inputs.size() > 1) { if (input.inputs[1].type() == LogicalType::UBIGINT) { - snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_meta_path, fs, input.inputs[1].GetValue(), metadata_compression_codec, skip_schema_inference); + snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_meta_path, fs, input.inputs[1].GetValue(), options); } else if (input.inputs[1].type() == LogicalType::TIMESTAMP) { snapshot_to_scan = - IcebergSnapshot::GetSnapshotByTimestamp(iceberg_meta_path, fs, input.inputs[1].GetValue(), metadata_compression_codec, skip_schema_inference); + IcebergSnapshot::GetSnapshotByTimestamp(iceberg_meta_path, fs, input.inputs[1].GetValue(), options); } else { throw InvalidInputException("Unknown argument type in IcebergScanBindReplace."); } } else { - snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_meta_path, fs, metadata_compression_codec, skip_schema_inference); + snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_meta_path, fs, options); } ret->iceberg_table = - make_uniq(IcebergTable::Load(iceberg_path, snapshot_to_scan, fs, allow_moved_paths, metadata_compression_codec)); + make_uniq(IcebergTable::Load(iceberg_path, snapshot_to_scan, fs, options)); auto manifest_types = IcebergManifest::Types(); return_types.insert(return_types.end(), manifest_types.begin(), manifest_types.end()); diff --git a/src/iceberg_functions/iceberg_multi_file_reader.cpp b/src/iceberg_functions/iceberg_multi_file_reader.cpp new file mode 100644 index 0000000..9335a08 --- /dev/null +++ b/src/iceberg_functions/iceberg_multi_file_reader.cpp @@ -0,0 +1,557 @@ +#include "iceberg_multi_file_reader.hpp" + +#include "duckdb/catalog/catalog_entry/table_function_catalog_entry.hpp" +#include "duckdb/common/exception.hpp" +#include "duckdb/execution/execution_context.hpp" +#include "duckdb/main/extension_util.hpp" +#include "duckdb/parallel/thread_context.hpp" +#include "duckdb/parser/tableref/table_function_ref.hpp" +#include "duckdb/parser/expression/constant_expression.hpp" + +namespace duckdb { + +IcebergMultiFileList::IcebergMultiFileList(ClientContext &context_p, const string &path, const IcebergOptions &options) + : MultiFileList({path}, FileGlobOptions::ALLOW_EMPTY), lock(), context(context_p), options(options) { +} + +string IcebergMultiFileList::ToDuckDBPath(const string &raw_path) { + return raw_path; +} + +string IcebergMultiFileList::GetPath() { + return GetPaths()[0]; +} + +void IcebergMultiFileList::Bind(vector &return_types, vector &names) { + if (!initialized) { + InitializeFiles(); + } + + auto &schema = snapshot.schema; + for (auto &schema_entry : schema) { + names.push_back(schema_entry.name); + return_types.push_back(schema_entry.type); + } +} + +unique_ptr IcebergMultiFileList::ComplexFilterPushdown(ClientContext &context, + const MultiFileReaderOptions &options, + MultiFilePushdownInfo &info, + vector> &filters) { + //! FIXME: We don't handle filter pushdown yet into the file list + //! Leaving the skeleton here because we want to add this relatively soon anyways + return nullptr; +} + +vector IcebergMultiFileList::GetAllFiles() { + throw NotImplementedException("NOT IMPLEMENTED"); +} + +FileExpandResult IcebergMultiFileList::GetExpandResult() { + // GetFile(1) will ensure files with index 0 and index 1 are expanded if they are available + GetFile(1); + + if (data_files.size() > 1) { + return FileExpandResult::MULTIPLE_FILES; + } else if (data_files.size() == 1) { + return FileExpandResult::SINGLE_FILE; + } + + return FileExpandResult::NO_FILES; +} + +idx_t IcebergMultiFileList::GetTotalFileCount() { + // FIXME: the 'added_files_count' + the 'existing_files_count' + // in the Manifest List should give us this information without scanning the manifest list + idx_t i = data_files.size(); + while (!GetFile(i).empty()) { + i++; + } + return data_files.size(); +} + +unique_ptr IcebergMultiFileList::GetCardinality(ClientContext &context) { + auto total_file_count = IcebergMultiFileList::GetTotalFileCount(); + + if (total_file_count == 0) { + return make_uniq(0, 0); + } + + // FIXME: visit metadata to get a cardinality count + + return nullptr; +} + +string IcebergMultiFileList::GetFile(idx_t file_id) { + if (!initialized) { + InitializeFiles(); + } + + // Read enough data files + while (file_id >= data_files.size()) { + if (reader_state.finished) { + if (current_data_manifest == data_manifests.end()) { + break; + } + auto &manifest = *current_data_manifest; + reader_state = ManifestEntryReaderState(*manifest); + } + + auto new_entry = data_manifest_entry_reader->GetNext(reader_state); + if (!new_entry) { + D_ASSERT(reader_state.finished); + current_data_manifest++; + continue; + } + if (new_entry->status == IcebergManifestEntryStatusType::DELETED) { + // Skip deleted files + continue; + } + D_ASSERT(new_entry->content == IcebergManifestEntryContentType::DATA); + data_files.push_back(std::move(*new_entry)); + } + + if (file_id >= data_files.size()) { + return string(); + } + + D_ASSERT(file_id < data_files.size()); + auto &data_file = data_files[file_id]; + auto &path = data_file.file_path; + + if (options.allow_moved_paths) { + auto iceberg_path = GetPath(); + auto &fs = FileSystem::GetFileSystem(context); + return IcebergUtils::GetFullPath(iceberg_path, path, fs); + } else { + return path; + } +} + +void IcebergMultiFileList::InitializeFiles() { + lock_guard guard(lock); + if (initialized) { + return; + } + initialized = true; + + //! Load the snapshot + auto iceberg_path = GetPath(); + auto &fs = FileSystem::GetFileSystem(context); + auto iceberg_meta_path = IcebergSnapshot::GetMetaDataPath(context, iceberg_path, fs, options); + switch (options.snapshot_source) { + case SnapshotSource::LATEST: { + snapshot = IcebergSnapshot::GetLatestSnapshot(iceberg_meta_path, fs, options); + break; + } + case SnapshotSource::FROM_ID: { + snapshot = IcebergSnapshot::GetSnapshotById(iceberg_meta_path, fs, options.snapshot_id, options); + break; + } + case SnapshotSource::FROM_TIMESTAMP: { + snapshot = IcebergSnapshot::GetSnapshotByTimestamp(iceberg_meta_path, fs, options.snapshot_timestamp, options); + break; + } + default: + throw InternalException("SnapshotSource type not implemented"); + } + + //! Set up the manifest + manifest entry readers + if (snapshot.iceberg_format_version == 1) { + data_manifest_entry_reader = + make_uniq(iceberg_path, snapshot.manifest_list, fs, options); + delete_manifest_entry_reader = + make_uniq(iceberg_path, snapshot.manifest_list, fs, options); + manifest_reader = make_uniq(iceberg_path, snapshot.manifest_list, fs, options); + } else if (snapshot.iceberg_format_version == 2) { + data_manifest_entry_reader = + make_uniq(iceberg_path, snapshot.manifest_list, fs, options); + delete_manifest_entry_reader = + make_uniq(iceberg_path, snapshot.manifest_list, fs, options); + manifest_reader = make_uniq(iceberg_path, snapshot.manifest_list, fs, options); + } else { + throw InvalidInputException("Reading from Iceberg version %d is not supported yet", snapshot.iceberg_format_version); + } + + // Read the manifest list, we need all the manifests to determine if we've seen all deletes + while (!manifest_reader->Finished()) { + auto manifest = manifest_reader->GetNext(); + if (!manifest) { + break; + } + if (manifest->content == IcebergManifestContentType::DATA) { + data_manifests.push_back(std::move(manifest)); + } else { + D_ASSERT(manifest->content == IcebergManifestContentType::DELETE); + delete_manifests.push_back(std::move(manifest)); + } + } + current_data_manifest = data_manifests.begin(); + current_delete_manifest = delete_manifests.begin(); +} + +//! Multi File Reader + +unique_ptr IcebergMultiFileReader::CreateInstance(const TableFunction &table) { + (void)table; + return make_uniq(); +} + +shared_ptr IcebergMultiFileReader::CreateFileList(ClientContext &context, const vector &paths, + FileGlobOptions options) { + if (paths.size() != 1) { + throw BinderException("'iceberg_scan' only supports single path as input"); + } + return make_shared_ptr(context, paths[0], this->options); +} + +bool IcebergMultiFileReader::Bind(MultiFileReaderOptions &options, MultiFileList &files, + vector &return_types, vector &names, + MultiFileReaderBindData &bind_data) { + auto &iceberg_multi_file_list = dynamic_cast(files); + + iceberg_multi_file_list.Bind(return_types, names); + // FIXME: apply final transformation for 'file_row_number' ??? + + auto &schema = iceberg_multi_file_list.snapshot.schema; + auto &columns = bind_data.schema; + for (auto &item : schema) { + MultiFileReaderColumnDefinition column(item.name, item.type); + column.default_expression = make_uniq(item.default_value); + column.identifier = Value::INTEGER(item.id); + + columns.push_back(column); + } + bind_data.file_row_number_idx = names.size(); + bind_data.mapping = MultiFileReaderColumnMappingMode::BY_FIELD_ID; + return true; +} + +void IcebergMultiFileReader::BindOptions(MultiFileReaderOptions &options, MultiFileList &files, + vector &return_types, vector &names, + MultiFileReaderBindData &bind_data) { + // Disable all other multifilereader options + options.auto_detect_hive_partitioning = false; + options.hive_partitioning = false; + options.union_by_name = false; + + MultiFileReader::BindOptions(options, files, return_types, names, bind_data); +} + +void IcebergMultiFileReader::CreateColumnMapping(const string &file_name, + const vector &local_columns, + const vector &global_columns, + const vector &global_column_ids, + MultiFileReaderData &reader_data, + const MultiFileReaderBindData &bind_data, const string &initial_file, + optional_ptr global_state_p) { + + D_ASSERT(bind_data.mapping == MultiFileReaderColumnMappingMode::BY_FIELD_ID); + MultiFileReader::CreateColumnMappingByFieldId(file_name, local_columns, global_columns, global_column_ids, + reader_data, bind_data, initial_file, global_state_p); + + auto &global_state = global_state_p->Cast(); + // Check if the file_row_number column is an "extra_column" which is not part of the projection + if (!global_state.file_row_number_idx.IsValid()) { + return; + } + auto file_row_number_idx = global_state.file_row_number_idx.GetIndex(); + if (file_row_number_idx >= global_column_ids.size()) { + // Build the name map + case_insensitive_map_t name_map; + for (idx_t col_idx = 0; col_idx < local_columns.size(); col_idx++) { + name_map[local_columns[col_idx].name] = col_idx; + } + + // Lookup the required column in the local map + auto entry = name_map.find("file_row_number"); + if (entry == name_map.end()) { + throw IOException("Failed to find the file_row_number column"); + } + + // Register the column to be scanned from this file + reader_data.column_ids.push_back(entry->second); + reader_data.column_indexes.emplace_back(entry->second); + reader_data.column_mapping.push_back(file_row_number_idx); + } + + // This may have changed: update it + reader_data.empty_columns = reader_data.column_ids.empty(); + +} + +unique_ptr +IcebergMultiFileReader::InitializeGlobalState(ClientContext &context, const MultiFileReaderOptions &file_options, + const MultiFileReaderBindData &bind_data, const MultiFileList &file_list, + const vector &global_columns, + const vector &global_column_ids) { + + vector extra_columns; + // Map of column_name -> column_index + vector> mapped_columns; + + // TODO: only add file_row_number column if there are deletes + case_insensitive_map_t columns_to_map = { + {"file_row_number", LogicalType::BIGINT}, + }; + + // Create a map of the columns that are in the projection + // So we can detect that the projection already contains the 'extra_column' below + case_insensitive_map_t selected_columns; + for (idx_t i = 0; i < global_column_ids.size(); i++) { + auto global_id = global_column_ids[i]; + if (global_id.IsRowIdColumn()) { + continue; + } + + auto &global_name = global_columns[global_id.GetPrimaryIndex()].name; + selected_columns.insert({global_name, i}); + } + + // Map every column to either a column in the projection, or add it to the extra columns if it doesn't exist + idx_t col_offset = 0; + for (const auto &extra_column : columns_to_map) { + // First check if the column is in the projection + auto res = selected_columns.find(extra_column.first); + if (res != selected_columns.end()) { + // The column is in the projection, no special handling is required; we simply store the index + mapped_columns.push_back({extra_column.first, res->second}); + continue; + } + + // The column is NOT in the projection: it needs to be added as an extra_column + + // Calculate the index of the added column (extra columns are added after all other columns) + idx_t current_col_idx = global_column_ids.size() + col_offset++; + + // Add column to the map, to ensure the MultiFileReader can find it when processing the Chunk + mapped_columns.push_back({extra_column.first, current_col_idx}); + + // Ensure the result DataChunk has a vector of the correct type to store this column + extra_columns.push_back(extra_column.second); + } + + auto res = make_uniq(extra_columns, file_list); + + // Parse all the mapped columns into the DeltaMultiFileReaderGlobalState for easy use; + for (const auto &mapped_column : mapped_columns) { + auto &column_name = mapped_column.first; + auto column_index = mapped_column.second; + if (StringUtil::CIEquals(column_name, "file_row_number")) { + if (res->file_row_number_idx.IsValid()) { + throw InvalidInputException("'file_row_number' already set!"); + } + res->file_row_number_idx = column_index; + } else { + throw InternalException("Extra column type not handled"); + } + } + return std::move(res); +} + +void IcebergMultiFileReader::FinalizeBind(const MultiFileReaderOptions &file_options, + const MultiFileReaderBindData &options, const string &filename, + const vector &local_columns, + const vector &global_columns, + const vector &global_column_ids, MultiFileReaderData &reader_data, + ClientContext &context, optional_ptr global_state) { + MultiFileReader::FinalizeBind(file_options, options, filename, local_columns, global_columns, + global_column_ids, reader_data, context, global_state); + return; +} + +void IcebergMultiFileList::ScanDeleteFile(const string &delete_file_path) const { + auto &instance = DatabaseInstance::GetDatabase(context); + auto &parquet_scan_entry = ExtensionUtil::GetTableFunction(instance, "parquet_scan"); + auto &parquet_scan = parquet_scan_entry.functions.functions[0]; + + // Prepare the inputs for the bind + vector children; + children.reserve(1); + children.push_back(Value(delete_file_path)); + named_parameter_map_t named_params; + vector input_types; + vector input_names; + + TableFunctionRef empty; + TableFunction dummy_table_function; + dummy_table_function.name = "IcebergDeleteScan"; + TableFunctionBindInput bind_input(children, named_params, input_types, input_names, nullptr, nullptr, + dummy_table_function, empty); + vector return_types; + vector return_names; + + auto bind_data = parquet_scan.bind(context, bind_input, return_types, return_names); + + DataChunk result; + // Reserve for STANDARD_VECTOR_SIZE instead of count, in case the returned table contains too many tuples + result.Initialize(context, return_types, STANDARD_VECTOR_SIZE); + + ThreadContext thread_context(context); + ExecutionContext execution_context(context, thread_context, nullptr); + + vector column_ids; + for (idx_t i = 0; i < return_types.size(); i++) { + column_ids.push_back(i); + } + TableFunctionInitInput input(bind_data.get(), column_ids, vector(), nullptr); + auto global_state = parquet_scan.init_global(context, input); + auto local_state = parquet_scan.init_local(execution_context, input, global_state.get()); + + do { + TableFunctionInput function_input(bind_data.get(), local_state.get(), global_state.get()); + result.Reset(); + parquet_scan.function(context, function_input, result); + + idx_t count = result.size(); + for (auto &vec : result.data) { + vec.Flatten(count); + } + + auto names = FlatVector::GetData(result.data[0]); + auto row_ids = FlatVector::GetData(result.data[1]); + + if (count == 0) { + continue; + } + reference current_file_path = names[0]; + reference deletes = delete_data[current_file_path.get().GetString()]; + + for (idx_t i = 0; i < count; i++) { + auto &name = names[i]; + auto &row_id = row_ids[i]; + + if (name != current_file_path.get()) { + current_file_path = name; + deletes = delete_data[current_file_path.get().GetString()]; + } + + deletes.get().AddRow(row_id); + } + } while (result.size() != 0); +} + +optional_ptr IcebergMultiFileList::GetDeletesForFile(const string &file_path) const { + auto it = delete_data.find(file_path); + if (it != delete_data.end()) { + // There is delete data for this file, return it + auto &deletes = it->second; + return deletes; + } + return nullptr; +} + +void IcebergMultiFileList::ProcessDeletes() const { + // In <=v2 we now have to process *all* delete manifests + // before we can be certain that we have all the delete data for the current file. + + // v3 solves this, `referenced_data_file` will tell us which file the `data_file` + // is targeting before we open it, and there can only be one deletion vector per data file. + + // From the spec: "At most one deletion vector is allowed per data file in a snapshot" + + ManifestEntryReaderState reader_state; + while (current_delete_manifest != delete_manifests.end()) { + if (reader_state.finished) { + auto &manifest = *current_delete_manifest; + reader_state = ManifestEntryReaderState(*manifest); + } + + auto new_entry = delete_manifest_entry_reader->GetNext(reader_state); + if (!new_entry) { + D_ASSERT(reader_state.finished); + current_delete_manifest++; + continue; + } + if (new_entry->status == IcebergManifestEntryStatusType::DELETED) { + // Skip deleted files + continue; + } + D_ASSERT(new_entry->content != IcebergManifestEntryContentType::DATA); + //! FIXME: with v3 we can check from the metadata whether this targets our file + // we can avoid (read: delay) materializing the file in that case + ScanDeleteFile(new_entry->file_path); + } + + D_ASSERT(current_delete_manifest == delete_manifests.end()); +} + +void IcebergMultiFileReader::FinalizeChunk(ClientContext &context, const MultiFileReaderBindData &bind_data, + const MultiFileReaderData &reader_data, DataChunk &chunk, + optional_ptr global_state) { + // Base class finalization first + MultiFileReader::FinalizeChunk(context, bind_data, reader_data, chunk, global_state); + + D_ASSERT(global_state); + auto &iceberg_global_state = global_state->Cast(); + D_ASSERT(iceberg_global_state.file_list); + + // Get the metadata for this file + const auto &multi_file_list = dynamic_cast(*global_state->file_list); + auto file_id = reader_data.file_list_idx.GetIndex(); + auto &data_file = multi_file_list.data_files[file_id]; + + // The path of the data file where this chunk was read from + auto &file_path = data_file.file_path; + optional_ptr delete_data; + { + std::lock_guard guard(multi_file_list.delete_lock); + if (multi_file_list.current_delete_manifest != multi_file_list.delete_manifests.end()) { + multi_file_list.ProcessDeletes(); + } + delete_data = multi_file_list.GetDeletesForFile(file_path); + } + + //! FIXME: how can we retrieve which rows these were in the file? + // Looks like delta does this by adding an extra projection so the chunk has a file_row_id column + if (delete_data) { + D_ASSERT(iceberg_global_state.file_row_number_idx.IsValid()); + auto &file_row_number_column = chunk.data[iceberg_global_state.file_row_number_idx.GetIndex()]; + + delete_data->Apply(chunk, file_row_number_column); + } +} + +bool IcebergMultiFileReader::ParseOption(const string &key, const Value &val, MultiFileReaderOptions &options, + ClientContext &context) { + auto loption = StringUtil::Lower(key); + if (loption == "allow_moved_paths") { + this->options.allow_moved_paths = BooleanValue::Get(val); + return true; + } + if (loption == "metadata_compression_codec") { + this->options.metadata_compression_codec = StringValue::Get(val); + return true; + } + if (loption == "skip_schema_inference") { + this->options.skip_schema_inference = BooleanValue::Get(val); + return true; + } + if (loption == "version") { + this->options.table_version = StringValue::Get(val); + return true; + } + if (loption == "version_name_format") { + this->options.version_name_format = StringValue::Get(val); + return true; + } + if (loption == "snapshot_from_id") { + if (this->options.snapshot_source != SnapshotSource::LATEST) { + throw InvalidInputException("Can't use 'snapshot_from_id' in combination with 'snapshot_from_timestamp'"); + } + this->options.snapshot_source = SnapshotSource::FROM_ID; + this->options.snapshot_id = val.GetValue(); + return true; + } + if (loption == "snapshot_from_timestamp") { + if (this->options.snapshot_source != SnapshotSource::LATEST) { + throw InvalidInputException("Can't use 'snapshot_from_id' in combination with 'snapshot_from_timestamp'"); + } + this->options.snapshot_source = SnapshotSource::FROM_TIMESTAMP; + this->options.snapshot_timestamp = val.GetValue(); + return true; + } + return MultiFileReader::ParseOption(key, val, options, context); +} + +} // namespace duckdb diff --git a/src/iceberg_functions/iceberg_scan.cpp b/src/iceberg_functions/iceberg_scan.cpp index e819c45..b2c1d6e 100644 --- a/src/iceberg_functions/iceberg_scan.cpp +++ b/src/iceberg_functions/iceberg_scan.cpp @@ -19,8 +19,10 @@ #include "duckdb/planner/operator/logical_comparison_join.hpp" #include "duckdb/common/file_opener.hpp" #include "duckdb/common/file_system.hpp" +#include "duckdb/main/extension_util.hpp" #include "iceberg_metadata.hpp" #include "iceberg_utils.hpp" +#include "iceberg_multi_file_reader.hpp" #include "iceberg_functions.hpp" #include "yyjson.hpp" @@ -29,311 +31,45 @@ namespace duckdb { -struct IcebergScanGlobalTableFunctionState : public GlobalTableFunctionState { -public: - static unique_ptr Init(ClientContext &context, TableFunctionInitInput &input) { - return make_uniq(); - } -}; - -static unique_ptr GetFilenameExpr(unique_ptr colref_expr) { - vector> split_children; - split_children.push_back(std::move(colref_expr)); - split_children.push_back(make_uniq(Value("/"))); - auto data_split = make_uniq("string_split", std::move(split_children)); - - vector> list_extract_children; - list_extract_children.push_back(std::move(data_split)); - list_extract_children.push_back(make_uniq(Value(-1))); - auto list_extract_expr = make_uniq("list_extract", std::move(list_extract_children)); - - return std::move(list_extract_expr); -} - -static unique_ptr GetFilenameMatchExpr() { - auto data_colref_expr = make_uniq("filename", "iceberg_scan_data"); - auto delete_colref_expr = make_uniq("file_path", "iceberg_scan_deletes"); - - auto data_filename_expr = GetFilenameExpr(std::move(data_colref_expr)); - auto delete_filename_expr = GetFilenameExpr(std::move(delete_colref_expr)); - - return make_uniq(ExpressionType::COMPARE_NOT_DISTINCT_FROM, std::move(data_filename_expr), - std::move(delete_filename_expr)); -}; - -//! Uses recursive unnest on list of structs to return a table with all data and delete files -//! TODO: refactor, probably. -static unique_ptr MakeListFilesExpression(vector &data_file_values, - vector &delete_file_values) { - vector structs; - for (const auto &file : data_file_values) { - child_list_t child; - child.emplace_back(make_pair("file", file)); - child.emplace_back(make_pair("type", Value("data"))); - structs.push_back(Value::STRUCT(child)); - } - for (const auto &file : delete_file_values) { - child_list_t child; - child.emplace_back(make_pair("file", file)); - child.emplace_back(make_pair("type", Value("delete"))); - structs.push_back(Value::STRUCT(child)); - } - - // Unnest - vector> unnest_children; - unnest_children.push_back(make_uniq(Value::LIST(structs))); - auto recursive_named_param = make_uniq(Value::BOOLEAN(true)); - recursive_named_param->alias = "recursive"; - unnest_children.push_back(std::move(recursive_named_param)); - - // Select node - auto select_node = make_uniq(); - vector> select_exprs; - select_exprs.emplace_back(make_uniq("unnest", std::move(unnest_children))); - select_node->select_list = std::move(select_exprs); - select_node->from_table = make_uniq(); - - // Select statement - auto select_statement = make_uniq(); - select_statement->node = std::move(select_node); - return make_uniq(std::move(select_statement), "iceberg_scan"); -} - -// Create the param for passing the iceberg schema to the parquet reader as a DuckDB map -static Value GetParquetSchemaParam(vector &schema) { - vector map_entries; - - for (auto &schema_entry : schema) { - child_list_t map_value_children; - map_value_children.push_back(make_pair("name", Value(schema_entry.name))); - map_value_children.push_back(make_pair("type", Value(schema_entry.type.ToString()))); - map_value_children.push_back(make_pair("default_value", schema_entry.default_value)); - auto map_value = Value::STRUCT(map_value_children); - - child_list_t map_entry_children; - map_entry_children.push_back(make_pair("key", schema_entry.id)); - map_entry_children.push_back(make_pair("values", map_value)); - auto map_entry = Value::STRUCT(map_entry_children); - - map_entries.push_back(map_entry); - } - - auto param_type = - LogicalType::STRUCT({{"key", LogicalType::INTEGER}, - {"value", LogicalType::STRUCT({{{"name", LogicalType::VARCHAR}, - {"type", LogicalType::VARCHAR}, - {"default_value", LogicalType::VARCHAR}}})}}); - auto ret = Value::MAP(param_type, map_entries); - return ret; -} - -//! Build the Parquet Scan expression for the files we need to scan -static unique_ptr MakeScanExpression(vector &data_file_values, vector &delete_file_values, - vector &schema, bool allow_moved_paths, - string metadata_compression_codec, bool skip_schema_inference, - int64_t data_cardinality, int64_t delete_cardinality) { - - auto cardinality = make_uniq(ExpressionType::COMPARE_EQUAL, make_uniq("explicit_cardinality"), - make_uniq(Value(data_cardinality))); - - // No deletes, just return a TableFunctionRef for a parquet scan of the data files - if (delete_file_values.empty()) { - auto table_function_ref_data = make_uniq(); - table_function_ref_data->alias = "iceberg_scan_data"; - vector> left_children; - left_children.push_back(make_uniq(Value::LIST(data_file_values))); - left_children.push_back(std::move(cardinality)); - if (!skip_schema_inference) { - left_children.push_back( - make_uniq(ExpressionType::COMPARE_EQUAL, make_uniq("schema"), - make_uniq(GetParquetSchemaParam(schema)))); - } - - table_function_ref_data->function = make_uniq("parquet_scan", std::move(left_children)); - return std::move(table_function_ref_data); - } - - // Join - auto join_node = make_uniq(JoinRefType::REGULAR); - auto filename_match_expr = - allow_moved_paths - ? GetFilenameMatchExpr() - : make_uniq(ExpressionType::COMPARE_NOT_DISTINCT_FROM, - make_uniq("filename", "iceberg_scan_data"), - make_uniq("file_path", "iceberg_scan_deletes")); - join_node->type = JoinType::ANTI; - join_node->condition = make_uniq( - ExpressionType::CONJUNCTION_AND, std::move(filename_match_expr), - make_uniq(ExpressionType::COMPARE_NOT_DISTINCT_FROM, - make_uniq("file_row_number", "iceberg_scan_data"), - make_uniq("pos", "iceberg_scan_deletes"))); - - // LHS: data - auto table_function_ref_data = make_uniq(); - table_function_ref_data->alias = "iceberg_scan_data"; - vector> left_children; - left_children.push_back(make_uniq(Value::LIST(data_file_values))); - left_children.push_back(std::move(cardinality)); - left_children.push_back(make_uniq(ExpressionType::COMPARE_EQUAL, - make_uniq("filename"), - make_uniq(Value(1)))); - left_children.push_back(make_uniq(ExpressionType::COMPARE_EQUAL, - make_uniq("file_row_number"), - make_uniq(Value(1)))); - if (!skip_schema_inference) { - left_children.push_back( - make_uniq(ExpressionType::COMPARE_EQUAL, make_uniq("schema"), - make_uniq(GetParquetSchemaParam(schema)))); - } - table_function_ref_data->function = make_uniq("parquet_scan", std::move(left_children)); - join_node->left = std::move(table_function_ref_data); - - // RHS: deletes - auto table_function_ref_deletes = make_uniq(); - table_function_ref_deletes->alias = "iceberg_scan_deletes"; - vector> right_children; - right_children.push_back(make_uniq(Value::LIST(delete_file_values))); - right_children.push_back(make_uniq(ExpressionType::COMPARE_EQUAL, make_uniq("explicit_cardinality"), - make_uniq(Value(delete_cardinality)))); - table_function_ref_deletes->function = make_uniq("parquet_scan", std::move(right_children)); - join_node->right = std::move(table_function_ref_deletes); - - // Wrap the join in a select, exclude the filename and file_row_number cols - auto select_statement = make_uniq(); - - // Construct Select node - auto select_node = make_uniq(); - select_node->from_table = std::move(join_node); - auto select_expr = make_uniq(); - select_expr->exclude_list = {QualifiedColumnName("filename"), QualifiedColumnName("file_row_number")}; - vector> select_exprs; - select_exprs.push_back(std::move(select_expr)); - select_node->select_list = std::move(select_exprs); - select_statement->node = std::move(select_node); - - return make_uniq(std::move(select_statement), "iceberg_scan"); -} - -static unique_ptr IcebergScanBindReplace(ClientContext &context, TableFunctionBindInput &input) { - FileSystem &fs = FileSystem::GetFileSystem(context); - auto iceberg_path = input.inputs[0].ToString(); - - // Enabling this will ensure the ANTI Join with the deletes only looks at filenames, instead of full paths - // this allows hive tables to be moved and have mismatching paths, usefull for testing, but will have worse - // performance - bool allow_moved_paths = false; - bool skip_schema_inference = false; - string mode = "default"; - string metadata_compression_codec = "none"; - string table_version = DEFAULT_TABLE_VERSION; - string version_name_format = DEFAULT_TABLE_VERSION_FORMAT; - - for (auto &kv : input.named_parameters) { - auto loption = StringUtil::Lower(kv.first); - if (loption == "allow_moved_paths") { - allow_moved_paths = BooleanValue::Get(kv.second); - if (StringUtil::EndsWith(iceberg_path, ".json")) { - throw InvalidInputException( - "Enabling allow_moved_paths is not enabled for directly scanning metadata files."); - } - } else if (loption == "mode") { - mode = StringValue::Get(kv.second); - } else if (loption == "metadata_compression_codec") { - metadata_compression_codec = StringValue::Get(kv.second); - } else if (loption == "skip_schema_inference") { - skip_schema_inference = BooleanValue::Get(kv.second); - } else if (loption == "version") { - table_version = StringValue::Get(kv.second); - } else if (loption == "version_name_format") { - version_name_format = StringValue::Get(kv.second); - } - } - auto iceberg_meta_path = IcebergSnapshot::GetMetaDataPath(context, iceberg_path, fs, metadata_compression_codec, table_version, version_name_format); - IcebergSnapshot snapshot_to_scan; - if (input.inputs.size() > 1) { - if (input.inputs[1].type() == LogicalType::UBIGINT) { - snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_meta_path, fs, input.inputs[1].GetValue(), metadata_compression_codec, skip_schema_inference); - } else if (input.inputs[1].type() == LogicalType::TIMESTAMP) { - snapshot_to_scan = - IcebergSnapshot::GetSnapshotByTimestamp(iceberg_meta_path, fs, input.inputs[1].GetValue(), metadata_compression_codec, skip_schema_inference); - } else { - throw InvalidInputException("Unknown argument type in IcebergScanBindReplace."); - } - } else { - snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_meta_path, fs, metadata_compression_codec, skip_schema_inference); - } - - IcebergTable iceberg_table = IcebergTable::Load(iceberg_path, snapshot_to_scan, fs, allow_moved_paths, metadata_compression_codec); - auto data_files = iceberg_table.GetPaths(); - auto delete_files = iceberg_table.GetPaths(); - vector data_file_values; - for (auto &data_file : data_files) { - data_file_values.push_back( - {allow_moved_paths ? IcebergUtils::GetFullPath(iceberg_path, data_file, fs) : data_file}); - } - vector delete_file_values; - for (auto &delete_file : delete_files) { - delete_file_values.push_back( - {allow_moved_paths ? IcebergUtils::GetFullPath(iceberg_path, delete_file, fs) : delete_file}); - } - - if (mode == "list_files") { - return MakeListFilesExpression(data_file_values, delete_file_values); - } else if (mode == "default") { - int64_t data_cardinality = 0, delete_cardinality = 0; - for(auto &manifest : iceberg_table.entries) { - for(auto &entry : manifest.manifest_entries) { - if (entry.status != IcebergManifestEntryStatusType::DELETED) { - if (entry.content == IcebergManifestEntryContentType::DATA) { - data_cardinality += entry.record_count; - } else { // DELETES - delete_cardinality += entry.record_count; - } - } - } - } - return MakeScanExpression(data_file_values, delete_file_values, snapshot_to_scan.schema, allow_moved_paths, metadata_compression_codec, skip_schema_inference, data_cardinality, delete_cardinality); - } else { - throw NotImplementedException("Unknown mode type for ICEBERG_SCAN bind : '" + mode + "'"); - } -} - -TableFunctionSet IcebergFunctions::GetIcebergScanFunction() { - TableFunctionSet function_set("iceberg_scan"); - - auto fun = TableFunction({LogicalType::VARCHAR}, nullptr, nullptr, IcebergScanGlobalTableFunctionState::Init); - fun.bind_replace = IcebergScanBindReplace; +static void AddNamedParameters(TableFunction &fun) { fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN; fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN; fun.named_parameters["mode"] = LogicalType::VARCHAR; fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR; fun.named_parameters["version"] = LogicalType::VARCHAR; fun.named_parameters["version_name_format"] = LogicalType::VARCHAR; - function_set.AddFunction(fun); + fun.named_parameters["snapshot_from_timestamp"] = LogicalType::TIMESTAMP; + fun.named_parameters["snapshot_from_id"] = LogicalType::UBIGINT; +} - fun = TableFunction({LogicalType::VARCHAR, LogicalType::UBIGINT}, nullptr, nullptr, - IcebergScanGlobalTableFunctionState::Init); - fun.bind_replace = IcebergScanBindReplace; - fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN; - fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN; - fun.named_parameters["mode"] = LogicalType::VARCHAR; - fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR; - fun.named_parameters["version"] = LogicalType::VARCHAR; - fun.named_parameters["version_name_format"] = LogicalType::VARCHAR; - function_set.AddFunction(fun); +TableFunctionSet IcebergFunctions::GetIcebergScanFunction(DatabaseInstance &instance) { + // The iceberg_scan function is constructed by grabbing the parquet scan from the Catalog, then injecting the + // IcebergMultiFileReader into it to create a Iceberg-based multi file read - fun = TableFunction({LogicalType::VARCHAR, LogicalType::TIMESTAMP}, nullptr, nullptr, - IcebergScanGlobalTableFunctionState::Init); - fun.bind_replace = IcebergScanBindReplace; - fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN; - fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN; - fun.named_parameters["mode"] = LogicalType::VARCHAR; - fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR; - fun.named_parameters["version"] = LogicalType::VARCHAR; - fun.named_parameters["version_name_format"] = LogicalType::VARCHAR; - function_set.AddFunction(fun); + auto &parquet_scan = ExtensionUtil::GetTableFunction(instance, "parquet_scan"); + auto parquet_scan_copy = parquet_scan.functions; + + for (auto &function : parquet_scan_copy.functions) { + // Register the MultiFileReader as the driver for reads + function.get_multi_file_reader = IcebergMultiFileReader::CreateInstance; + + // Unset all of these: they are either broken, very inefficient. + // TODO: implement/fix these + function.serialize = nullptr; + function.deserialize = nullptr; + function.statistics = nullptr; + function.table_scan_progress = nullptr; + function.get_bind_info = nullptr; + + // Schema param is just confusing here + function.named_parameters.erase("schema"); + AddNamedParameters(function); + + function.name = "iceberg_scan"; + } - return function_set; + parquet_scan_copy.name = "iceberg_scan"; + return parquet_scan_copy; } } // namespace duckdb diff --git a/src/iceberg_functions/iceberg_snapshots.cpp b/src/iceberg_functions/iceberg_snapshots.cpp index e6d8e54..d754357 100644 --- a/src/iceberg_functions/iceberg_snapshots.cpp +++ b/src/iceberg_functions/iceberg_snapshots.cpp @@ -1,8 +1,9 @@ #include "duckdb/common/file_opener.hpp" #include "duckdb/common/file_system.hpp" +#include "iceberg_functions.hpp" #include "iceberg_metadata.hpp" +#include "iceberg_options.hpp" #include "iceberg_utils.hpp" -#include "iceberg_functions.hpp" #include "yyjson.hpp" #include @@ -12,10 +13,7 @@ namespace duckdb { struct IcebergSnaphotsBindData : public TableFunctionData { IcebergSnaphotsBindData() {}; string filename; - string metadata_compression_codec; - string table_version; - string version_name_format; - bool skip_schema_inference = false; + IcebergOptions options; }; struct IcebergSnapshotGlobalTableFunctionState : public GlobalTableFunctionState { @@ -26,15 +24,16 @@ struct IcebergSnapshotGlobalTableFunctionState : public GlobalTableFunctionState } } static unique_ptr Init(ClientContext &context, TableFunctionInitInput &input) { - + auto bind_data = input.bind_data->Cast(); auto global_state = make_uniq(); - + FileSystem &fs = FileSystem::GetFileSystem(context); - auto iceberg_meta_path = IcebergSnapshot::GetMetaDataPath( - context, bind_data.filename, fs, bind_data.metadata_compression_codec, bind_data.table_version, bind_data.version_name_format); - global_state->metadata_file = IcebergSnapshot::ReadMetaData(iceberg_meta_path, fs, bind_data.metadata_compression_codec); + auto iceberg_meta_path = + IcebergSnapshot::GetMetaDataPath(context, bind_data.filename, fs, bind_data.options); + global_state->metadata_file = + IcebergSnapshot::ReadMetaData(iceberg_meta_path, fs, bind_data.options.metadata_compression_codec); global_state->metadata_doc = yyjson_read(global_state->metadata_file.c_str(), global_state->metadata_file.size(), 0); auto root = yyjson_doc_get_root(global_state->metadata_doc); @@ -53,29 +52,20 @@ struct IcebergSnapshotGlobalTableFunctionState : public GlobalTableFunctionState static unique_ptr IcebergSnapshotsBind(ClientContext &context, TableFunctionBindInput &input, vector &return_types, vector &names) { auto bind_data = make_uniq(); - - string metadata_compression_codec = "none"; - string table_version = DEFAULT_TABLE_VERSION; - string version_name_format = DEFAULT_TABLE_VERSION_FORMAT; - bool skip_schema_inference = false; - + for (auto &kv : input.named_parameters) { auto loption = StringUtil::Lower(kv.first); if (loption == "metadata_compression_codec") { - metadata_compression_codec = StringValue::Get(kv.second); + bind_data->options.metadata_compression_codec = StringValue::Get(kv.second); } else if (loption == "version") { - table_version = StringValue::Get(kv.second); + bind_data->options.table_version = StringValue::Get(kv.second); } else if (loption == "version_name_format") { - version_name_format = StringValue::Get(kv.second); + bind_data->options.version_name_format = StringValue::Get(kv.second); } else if (loption == "skip_schema_inference") { - skip_schema_inference = BooleanValue::Get(kv.second); + bind_data->options.skip_schema_inference = BooleanValue::Get(kv.second); } } bind_data->filename = input.inputs[0].ToString(); - bind_data->metadata_compression_codec = metadata_compression_codec; - bind_data->skip_schema_inference = skip_schema_inference; - bind_data->table_version = table_version; - bind_data->version_name_format = version_name_format; names.emplace_back("sequence_number"); return_types.emplace_back(LogicalType::UBIGINT); @@ -92,10 +82,6 @@ static unique_ptr IcebergSnapshotsBind(ClientContext &context, Tab return std::move(bind_data); } -static void IcebergSnapshotsFunction(ClientContext &context, TableFunctionInput &data, - vector &return_types, vector &names) { - -} // Snapshots function static void IcebergSnapshotsFunction(ClientContext &context, TableFunctionInput &data, DataChunk &output) { auto &global_state = data.global_state->Cast(); @@ -106,11 +92,9 @@ static void IcebergSnapshotsFunction(ClientContext &context, TableFunctionInput break; } - auto parse_info = IcebergSnapshot::GetParseInfo(*global_state.metadata_doc); auto snapshot = IcebergSnapshot::ParseSnapShot(next_snapshot, global_state.iceberg_format_version, - parse_info->schema_id, parse_info->schemas, bind_data.metadata_compression_codec, - bind_data.skip_schema_inference); + parse_info->schema_id, parse_info->schemas, bind_data.options); FlatVector::GetData(output.data[0])[i] = snapshot.sequence_number; FlatVector::GetData(output.data[1])[i] = snapshot.snapshot_id; diff --git a/src/include/iceberg_functions.hpp b/src/include/iceberg_functions.hpp index 1f0a053..cd85a8d 100644 --- a/src/include/iceberg_functions.hpp +++ b/src/include/iceberg_functions.hpp @@ -16,12 +16,12 @@ namespace duckdb { class IcebergFunctions { public: - static vector GetTableFunctions(); + static vector GetTableFunctions(DatabaseInstance &instance); static vector GetScalarFunctions(); private: static TableFunctionSet GetIcebergSnapshotsFunction(); - static TableFunctionSet GetIcebergScanFunction(); + static TableFunctionSet GetIcebergScanFunction(DatabaseInstance &instance); static TableFunctionSet GetIcebergMetadataFunction(); }; diff --git a/src/include/iceberg_metadata.hpp b/src/include/iceberg_metadata.hpp index dd90f6c..ddd34b1 100644 --- a/src/include/iceberg_metadata.hpp +++ b/src/include/iceberg_metadata.hpp @@ -11,30 +11,12 @@ #include "duckdb.hpp" #include "yyjson.hpp" #include "iceberg_types.hpp" +#include "iceberg_options.hpp" using namespace duckdb_yyjson; namespace duckdb { -static string VERSION_GUESSING_CONFIG_VARIABLE = "unsafe_enable_version_guessing"; - -// When this is provided (and unsafe_enable_version_guessing is true) -// we first look for DEFAULT_VERSION_HINT_FILE, if it doesn't exist we -// then search for versions matching the DEFAULT_TABLE_VERSION_FORMAT -// We take the lexographically "greatest" one as the latest version -// Note that this will voliate ACID constraints in some situations. -static string UNKNOWN_TABLE_VERSION = "?"; - -// First arg is version string, arg is either empty or ".gz" if gzip -// Allows for both "v###.gz.metadata.json" and "###.metadata.json" styles -static string DEFAULT_TABLE_VERSION_FORMAT = "v%s%s.metadata.json,%s%s.metadata.json"; - -// This isn't explicitly in the standard, but is a commonly used technique -static string DEFAULT_VERSION_HINT_FILE = "version-hint.text"; - -// By default we will use the unknown version behavior mentioned above -static string DEFAULT_TABLE_VERSION = UNKNOWN_TABLE_VERSION; - struct IcebergColumnDefinition { public: static IcebergColumnDefinition ParseFromJson(yyjson_val *val); @@ -79,15 +61,15 @@ class IcebergSnapshot { uint64_t schema_id; vector schema; string metadata_compression_codec = "none"; - - static IcebergSnapshot GetLatestSnapshot(const string &path, FileSystem &fs, string metadata_compression_codec, bool skip_schema_inference); - static IcebergSnapshot GetSnapshotById(const string &path, FileSystem &fs, idx_t snapshot_id, string metadata_compression_codec, bool skip_schema_inference); - static IcebergSnapshot GetSnapshotByTimestamp(const string &path, FileSystem &fs, timestamp_t timestamp, string metadata_compression_codec, bool skip_schema_inference); +public: + static IcebergSnapshot GetLatestSnapshot(const string &path, FileSystem &fs, const IcebergOptions &options); + static IcebergSnapshot GetSnapshotById(const string &path, FileSystem &fs, idx_t snapshot_id, const IcebergOptions &options); + static IcebergSnapshot GetSnapshotByTimestamp(const string &path, FileSystem &fs, timestamp_t timestamp, const IcebergOptions &options); static IcebergSnapshot ParseSnapShot(yyjson_val *snapshot, idx_t iceberg_format_version, idx_t schema_id, - vector &schemas, string metadata_compression_codec, bool skip_schema_inference); - static string GetMetaDataPath(ClientContext &context, const string &path, FileSystem &fs, string metadata_compression_codec, string table_version, string version_format); - static string ReadMetaData(const string &path, FileSystem &fs, string metadata_compression_codec); + vector &schemas, const IcebergOptions &options); + static string GetMetaDataPath(ClientContext &context, const string &path, FileSystem &fs, const IcebergOptions &options); + static string ReadMetaData(const string &path, FileSystem &fs, const string &metadata_compression_codec); static yyjson_val *GetSnapshots(const string &path, FileSystem &fs, string GetSnapshotByTimestamp); static unique_ptr GetParseInfo(yyjson_doc &metadata_json); @@ -95,23 +77,24 @@ class IcebergSnapshot { //! Version extraction and identification static bool UnsafeVersionGuessingEnabled(ClientContext &context); static string GetTableVersionFromHint(const string &path, FileSystem &fs, string version_format); - static string GuessTableVersion(const string &meta_path, FileSystem &fs, string &table_version, string &metadata_compression_codec, string &version_format); + static string GuessTableVersion(const string &meta_path, FileSystem &fs, const IcebergOptions &options); static string PickTableVersion(vector &found_metadata, string &version_pattern, string &glob); + //! Internal JSON parsing functions static yyjson_val *FindLatestSnapshotInternal(yyjson_val *snapshots); static yyjson_val *FindSnapshotByIdInternal(yyjson_val *snapshots, idx_t target_id); static yyjson_val *FindSnapshotByIdTimestampInternal(yyjson_val *snapshots, timestamp_t timestamp); static vector ParseSchema(vector &schemas, idx_t schema_id); - static unique_ptr GetParseInfo(const string &path, FileSystem &fs, string metadata_compression_codec); + static unique_ptr GetParseInfo(const string &path, FileSystem &fs, const string &metadata_compression_codec); }; //! Represents the iceberg table at a specific IcebergSnapshot. Corresponds to a single Manifest List. struct IcebergTable { public: //! Loads all(!) metadata of into IcebergTable object - static IcebergTable Load(const string &iceberg_path, IcebergSnapshot &snapshot, FileSystem &fs, - bool allow_moved_paths = false, string metadata_compression_codec = "none"); + static IcebergTable Load(const string &iceberg_path, IcebergSnapshot &snapshot, FileSystem &fs, const IcebergOptions &options); +public: //! Returns all paths to be scanned for the IcebergManifestContentType template vector GetPaths() { @@ -129,6 +112,18 @@ struct IcebergTable { } return ret; } + vector GetAllPaths() { + vector ret; + for (auto &entry : entries) { + for (auto &manifest_entry : entry.manifest_entries) { + if (manifest_entry.status == IcebergManifestEntryStatusType::DELETED) { + continue; + } + ret.push_back(manifest_entry); + } + } + return ret; + } void Print() { Printer::Print("Iceberg table (" + path + ")"); diff --git a/src/include/iceberg_multi_file_reader.hpp b/src/include/iceberg_multi_file_reader.hpp new file mode 100644 index 0000000..47377f3 --- /dev/null +++ b/src/include/iceberg_multi_file_reader.hpp @@ -0,0 +1,192 @@ +//===----------------------------------------------------------------------===// +// DuckDB +// +// iceberg_multi_file_reader.hpp +// +// +//===----------------------------------------------------------------------===// + +#pragma once + +#include "duckdb/common/multi_file_reader.hpp" +#include "duckdb/common/types/batched_data_collection.hpp" +#include "iceberg_metadata.hpp" +#include "iceberg_utils.hpp" +#include "manifest_reader.hpp" + +namespace duckdb { + +// struct IcebergFileMetaData { +// public: +// IcebergFileMetaData() {}; +// IcebergFileMetaData (const IcebergFileMetaData&) = delete; +// IcebergFileMetaData& operator= (const IcebergFileMetaData&) = delete; +// public: +// optional_idx iceberg_snapshot_version; +// optional_idx file_number; +// optional_idx cardinality; +// case_insensitive_map_t partition_map; +//}; + +struct IcebergDeleteData { +public: + IcebergDeleteData() { + } + +public: + void AddRow(int64_t row_id) { + temp_invalid_rows.insert(row_id); + } + + void Apply(DataChunk &chunk, Vector &row_id_column) { + D_ASSERT(row_id_column.GetType() == LogicalType::BIGINT); + + if (chunk.size() == 0) { + return; + } + auto count = chunk.size(); + UnifiedVectorFormat data; + row_id_column.ToUnifiedFormat(count, data); + auto row_ids = UnifiedVectorFormat::GetData(data); + + SelectionVector result {count}; + idx_t selection_idx = 0; + for (idx_t tuple_idx = 0; tuple_idx < count; tuple_idx++) { + auto current_row_id = row_ids[data.sel->get_index(tuple_idx)]; + if (temp_invalid_rows.find(current_row_id) == temp_invalid_rows.end()) { + result[selection_idx++] = tuple_idx; + } + } + + chunk.Slice(result, selection_idx); + } + +public: + //! Store invalid rows here before finalizing into a SelectionVector + unordered_set temp_invalid_rows; +}; + +struct IcebergMultiFileList : public MultiFileList { +public: + IcebergMultiFileList(ClientContext &context, const string &path, const IcebergOptions &options); + +public: + static string ToDuckDBPath(const string &raw_path); + string GetPath(); + +public: + //! MultiFileList API + void Bind(vector &return_types, vector &names); + unique_ptr ComplexFilterPushdown(ClientContext &context, const MultiFileReaderOptions &options, + MultiFilePushdownInfo &info, + vector> &filters) override; + vector GetAllFiles() override; + FileExpandResult GetExpandResult() override; + idx_t GetTotalFileCount() override; + unique_ptr GetCardinality(ClientContext &context) override; + +public: + void ScanDeleteFile(const string &delete_file_path) const; + optional_ptr GetDeletesForFile(const string &file_path) const; + void ProcessDeletes() const; + +protected: + //! Get the i-th expanded file + string GetFile(idx_t i) override; + // TODO: How to guarantee we only call this after the filter pushdown? + void InitializeFiles(); + +public: + mutex lock; + // idx_t version; + + //! ComplexFilterPushdown results + vector names; + TableFilterSet table_filters; + + ////! Metadata map for files + // vector> metadata; + + ////! Current file list resolution state + unique_ptr manifest_reader; + unique_ptr data_manifest_entry_reader; + unique_ptr delete_manifest_entry_reader; + ManifestEntryReaderState reader_state; + vector data_files; + + vector> data_manifests; + vector> delete_manifests; + vector>::iterator current_data_manifest; + mutable vector>::iterator current_delete_manifest; + + //! For each file that has a delete file, the state for processing that/those delete file(s) + mutable case_insensitive_map_t delete_data; + mutable mutex delete_lock; + + bool initialized = false; + ClientContext &context; + const IcebergOptions &options; + IcebergSnapshot snapshot; +}; + +struct IcebergMultiFileReaderGlobalState : public MultiFileReaderGlobalState { +public: + IcebergMultiFileReaderGlobalState(vector extra_columns_p, const MultiFileList &file_list_p) + : MultiFileReaderGlobalState(std::move(extra_columns_p), file_list_p) { + } + +public: + //! The index of the column in the chunk that relates to the file_row_number + optional_idx file_row_number_idx; +}; + +struct IcebergMultiFileReader : public MultiFileReader { +public: + static unique_ptr CreateInstance(const TableFunction &table); + //! Return a IcebergSnapshot + shared_ptr CreateFileList(ClientContext &context, const vector &paths, + FileGlobOptions options) override; + + //! Override the regular parquet bind using the MultiFileReader Bind. The bind from these are what DuckDB's file + //! readers will try read + bool Bind(MultiFileReaderOptions &options, MultiFileList &files, vector &return_types, + vector &names, MultiFileReaderBindData &bind_data) override; + + //! Override the Options bind + void BindOptions(MultiFileReaderOptions &options, MultiFileList &files, vector &return_types, + vector &names, MultiFileReaderBindData &bind_data) override; + + void CreateColumnMapping(const string &file_name, + const vector &local_columns, + const vector &global_columns, + const vector &global_column_ids, MultiFileReaderData &reader_data, + const MultiFileReaderBindData &bind_data, const string &initial_file, + optional_ptr global_state) override; + + unique_ptr + InitializeGlobalState(ClientContext &context, const MultiFileReaderOptions &file_options, + const MultiFileReaderBindData &bind_data, const MultiFileList &file_list, + const vector &global_columns, + const vector &global_column_ids) override; + + void FinalizeBind(const MultiFileReaderOptions &file_options, + const MultiFileReaderBindData &options, const string &filename, + const vector &local_columns, + const vector &global_columns, + const vector &global_column_ids, MultiFileReaderData &reader_data, + ClientContext &context, optional_ptr global_state) override; + + //! Override the FinalizeChunk method + void FinalizeChunk(ClientContext &context, const MultiFileReaderBindData &bind_data, + const MultiFileReaderData &reader_data, DataChunk &chunk, + optional_ptr global_state) override; + + //! Override the ParseOption call to parse iceberg_scan specific options + bool ParseOption(const string &key, const Value &val, MultiFileReaderOptions &options, + ClientContext &context) override; + +public: + IcebergOptions options; +}; + +} // namespace duckdb \ No newline at end of file diff --git a/src/include/iceberg_options.hpp b/src/include/iceberg_options.hpp new file mode 100644 index 0000000..37d2d68 --- /dev/null +++ b/src/include/iceberg_options.hpp @@ -0,0 +1,44 @@ +#pragma once + +#include "duckdb/common/string.hpp" + +namespace duckdb { + +static string VERSION_GUESSING_CONFIG_VARIABLE = "unsafe_enable_version_guessing"; + +// When this is provided (and unsafe_enable_version_guessing is true) +// we first look for DEFAULT_VERSION_HINT_FILE, if it doesn't exist we +// then search for versions matching the DEFAULT_TABLE_VERSION_FORMAT +// We take the lexographically "greatest" one as the latest version +// Note that this will voliate ACID constraints in some situations. +static string UNKNOWN_TABLE_VERSION = "?"; + +// First arg is version string, arg is either empty or ".gz" if gzip +// Allows for both "v###.gz.metadata.json" and "###.metadata.json" styles +static string DEFAULT_TABLE_VERSION_FORMAT = "v%s%s.metadata.json,%s%s.metadata.json"; + +// This isn't explicitly in the standard, but is a commonly used technique +static string DEFAULT_VERSION_HINT_FILE = "version-hint.text"; + +// By default we will use the unknown version behavior mentioned above +static string DEFAULT_TABLE_VERSION = UNKNOWN_TABLE_VERSION; + +enum class SnapshotSource : uint8_t { + LATEST, + FROM_TIMESTAMP, + FROM_ID +}; + +struct IcebergOptions { + bool allow_moved_paths = false; + string metadata_compression_codec = "none"; + bool skip_schema_inference = false; + string table_version = DEFAULT_TABLE_VERSION; + string version_name_format = DEFAULT_TABLE_VERSION_FORMAT; + + SnapshotSource snapshot_source = SnapshotSource::LATEST; + uint64_t snapshot_id; + timestamp_t snapshot_timestamp; +}; + +} // namespace duckdb diff --git a/src/include/iceberg_utils.hpp b/src/include/iceberg_utils.hpp index 0fa829e..737627a 100644 --- a/src/include/iceberg_utils.hpp +++ b/src/include/iceberg_utils.hpp @@ -11,6 +11,7 @@ #include "duckdb/common/printer.hpp" #include "iceberg_types.hpp" #include "yyjson.hpp" +#include "duckdb/common/file_system.hpp" using namespace duckdb_yyjson; diff --git a/src/include/manifest_reader.hpp b/src/include/manifest_reader.hpp new file mode 100644 index 0000000..7ce4184 --- /dev/null +++ b/src/include/manifest_reader.hpp @@ -0,0 +1,209 @@ +#pragma once + +#include "avro/Compiler.hh" +#include "avro/DataFile.hh" +#include "avro/Decoder.hh" +#include "avro/Encoder.hh" +#include "avro/Stream.hh" +#include "avro/ValidSchema.hh" +#include "iceberg_options.hpp" +#include "iceberg_types.hpp" + +namespace duckdb { + +// Manifest Reader + +class ManifestReader { +public: + ManifestReader() { + } + virtual ~ManifestReader() { + } + +public: + bool Finished() const { + return finished; + } + virtual unique_ptr GetNext() = 0; + +protected: + bool finished = false; +}; + +class ManifestReaderV1 : public ManifestReader { +public: + ManifestReaderV1(const string &table_path, const string &path, FileSystem &fs, const IcebergOptions &options) { + auto file = options.allow_moved_paths ? IcebergUtils::GetFullPath(table_path, path, fs) : path; + content = IcebergUtils::FileToString(file, fs); + + auto stream = avro::memoryInputStream((unsigned char *)content.c_str(), content.size()); + schema = avro::compileJsonSchemaFromString(MANIFEST_SCHEMA_V1); + reader = make_uniq>(std::move(stream), schema); + } + +public: + unique_ptr GetNext() { + if (finished || !reader->read(manifest_file)) { + finished = true; + return nullptr; + } + return make_uniq(manifest_file); + } + +private: + string content; + avro::ValidSchema schema; + unique_ptr> reader; + c::manifest_file_v1 manifest_file; +}; + +class ManifestReaderV2 : public ManifestReader { +public: + ManifestReaderV2(const string &table_path, const string &path, FileSystem &fs, const IcebergOptions &options) { + auto file = options.allow_moved_paths ? IcebergUtils::GetFullPath(table_path, path, fs) : path; + content = IcebergUtils::FileToString(file, fs); + + auto stream = avro::memoryInputStream((unsigned char *)content.c_str(), content.size()); + schema = avro::compileJsonSchemaFromString(MANIFEST_SCHEMA); + reader = make_uniq>(std::move(stream), schema); + } + +public: + unique_ptr GetNext() { + // FIXME: use `hasMore` instead? + if (finished || !reader->read(manifest_file)) { + finished = true; + return nullptr; + } + return make_uniq(manifest_file); + } + +private: + string content; + avro::ValidSchema schema; + unique_ptr> reader; + c::manifest_file manifest_file; +}; + +// Manifest Entry Reader + +// FIXME: this is a little confusing, this is just used to initialize a ManifestEntryReader +// it does not hold any reading state +struct ManifestEntryReaderState { +public: + ManifestEntryReaderState(IcebergManifest &manifest) : manifest(manifest), initialized(false), finished(false) { + } + ManifestEntryReaderState() : manifest(nullptr), initialized(false), finished(true) { + } + +public: + optional_ptr manifest; + bool initialized = false; + bool finished = false; +}; + +class ManifestEntryReader { +public: + ManifestEntryReader(const string &table_path, FileSystem &fs, const IcebergOptions &options) + : table_path(table_path), fs(fs), options(options) { + } + virtual ~ManifestEntryReader() { + } + +public: + ManifestEntryReaderState InitializeScan(IcebergManifest &manifest) { + return ManifestEntryReaderState(manifest); + } + bool Finished() const { + return finished; + } + virtual unique_ptr GetNext(ManifestEntryReaderState &state) = 0; + +protected: + string table_path; + FileSystem &fs; + const IcebergOptions &options; + bool finished = false; +}; + +class ManifestEntryReaderV1 : public ManifestEntryReader { +public: + ManifestEntryReaderV1(const string &table_path, const string &path, FileSystem &fs, const IcebergOptions &options) + : ManifestEntryReader(table_path, fs, options) { + } + +public: + unique_ptr GetNext(ManifestEntryReaderState &state) { + if (state.finished) { + return nullptr; + } + + if (!state.initialized) { + // First call + auto file = options.allow_moved_paths + ? IcebergUtils::GetFullPath(table_path, state.manifest->manifest_path, fs) + : state.manifest->manifest_path; + content = IcebergUtils::FileToString(file, fs); + + auto stream = avro::memoryInputStream((unsigned char *)content.c_str(), content.size()); + schema = avro::compileJsonSchemaFromString(MANIFEST_ENTRY_SCHEMA_V1); + reader = make_uniq>(std::move(stream), schema); + state.initialized = true; + } + + if (!reader->read(manifest_entry)) { + state.finished = true; + return nullptr; + } + + return make_uniq(manifest_entry); + } + +public: + avro::ValidSchema schema; + string content; + c::manifest_entry_v1 manifest_entry; + unique_ptr> reader; +}; + +class ManifestEntryReaderV2 : public ManifestEntryReader { +public: + ManifestEntryReaderV2(const string &table_path, const string &path, FileSystem &fs, const IcebergOptions &options) + : ManifestEntryReader(table_path, fs, options) { + } + +public: + unique_ptr GetNext(ManifestEntryReaderState &state) { + if (state.finished) { + return nullptr; + } + + if (!state.initialized) { + // First call + auto file = options.allow_moved_paths + ? IcebergUtils::GetFullPath(table_path, state.manifest->manifest_path, fs) + : state.manifest->manifest_path; + content = IcebergUtils::FileToString(file, fs); + + auto stream = avro::memoryInputStream((unsigned char *)content.c_str(), content.size()); + schema = avro::compileJsonSchemaFromString(MANIFEST_ENTRY_SCHEMA); + reader = make_uniq>(std::move(stream), schema); + state.initialized = true; + } + + if (!reader->read(manifest_entry)) { + state.finished = true; + return nullptr; + } + + return make_uniq(manifest_entry); + } + +public: + avro::ValidSchema schema; + string content; + c::manifest_entry manifest_entry; + unique_ptr> reader; +}; + +} // namespace duckdb diff --git a/src/storage/irc_table_entry.cpp b/src/storage/irc_table_entry.cpp index 7aac12c..4fa20ac 100644 --- a/src/storage/irc_table_entry.cpp +++ b/src/storage/irc_table_entry.cpp @@ -40,14 +40,10 @@ void ICTableEntry::BindUpdateConstraints(Binder &binder, LogicalGet &, LogicalPr TableFunction ICTableEntry::GetScanFunction(ClientContext &context, unique_ptr &bind_data) { auto &db = DatabaseInstance::GetDatabase(context); + auto &iceberg_scan_function_set = ExtensionUtil::GetTableFunction(db, "iceberg_scan"); + auto iceberg_scan_function = iceberg_scan_function_set.functions.GetFunctionByArguments(context, {LogicalType::VARCHAR}); auto &ic_catalog = catalog.Cast(); - auto &parquet_function_set = ExtensionUtil::GetTableFunction(db, "parquet_scan"); - auto parquet_scan_function = parquet_function_set.functions.GetFunctionByArguments(context, {LogicalType::VARCHAR}); - - auto &iceberg_function_set = ExtensionUtil::GetTableFunction(db, "iceberg_scan"); - auto iceberg_scan_function = iceberg_function_set.functions.GetFunctionByArguments(context, {LogicalType::VARCHAR}); - D_ASSERT(table_data); if (table_data->data_source_format != "ICEBERG") { @@ -95,35 +91,13 @@ TableFunction ICTableEntry::GetScanFunction(ClientContext &context, unique_ptr inputs = {table_data->storage_location}; - TableFunctionBindInput bind_input(inputs, param_map, return_types, names, nullptr, nullptr, + TableFunctionBindInput bind_input(inputs, param_map, return_types, names, nullptr, nullptr, iceberg_scan_function, empty_ref); - auto table_ref = iceberg_scan_function.bind_replace(context, bind_input); - - // 1) Create a Binder and bind the parser-level TableRef -> BoundTableRef - auto binder = Binder::CreateBinder(context); - auto bound_ref = binder->Bind(*table_ref); - - // 2) Create a logical plan from the bound reference - unique_ptr logical_plan = binder->CreatePlan(*bound_ref); - - // 3) Recursively search the logical plan for a LogicalGet node - // For a single table function, you often have just one operator: LogicalGet - LogicalOperator *op = logical_plan.get(); - switch (op->type) { - case LogicalOperatorType::LOGICAL_PROJECTION: - throw NotImplementedException("Iceberg scans with point deletes not supported"); - case LogicalOperatorType::LOGICAL_GET: - break; - default: - throw InternalException("Unsupported logical operator"); - } - - // 4) Access the bind_data inside LogicalGet - auto &get = op->Cast(); - bind_data = std::move(get.bind_data); + auto result = iceberg_scan_function.bind(context, bind_input, return_types, names); + bind_data = std::move(result); - return parquet_scan_function; + return iceberg_scan_function; } TableStorageInfo ICTableEntry::GetStorageInfo(ClientContext &context) { diff --git a/test/sql/local/iceberg_on_tpch.test b/test/sql/local/iceberg_on_tpch.test index bd21f1f..4524dd0 100644 --- a/test/sql/local/iceberg_on_tpch.test +++ b/test/sql/local/iceberg_on_tpch.test @@ -5,10 +5,10 @@ require-env ICEBERG_SERVER_AVAILABLE -require iceberg - require parquet +require iceberg + require httpfs require tpch diff --git a/test/sql/local/iceberg_metadata.test b/test/sql/local/iceberg_scans/iceberg_metadata.test similarity index 99% rename from test/sql/local/iceberg_metadata.test rename to test/sql/local/iceberg_scans/iceberg_metadata.test index e4ef083..182c7c0 100644 --- a/test/sql/local/iceberg_metadata.test +++ b/test/sql/local/iceberg_scans/iceberg_metadata.test @@ -8,6 +8,8 @@ SELECT * FROM ICEBERG_METADATA('data/persistent/iceberg/lineitem_iceberg'); ---- Catalog Error +require parquet + require iceberg query IIIIIIII diff --git a/test/sql/local/iceberg_scan.test b/test/sql/local/iceberg_scans/iceberg_scan.test similarity index 83% rename from test/sql/local/iceberg_scan.test rename to test/sql/local/iceberg_scans/iceberg_scan.test index 1d6bfa1..8e2a706 100644 --- a/test/sql/local/iceberg_scan.test +++ b/test/sql/local/iceberg_scans/iceberg_scan.test @@ -12,7 +12,6 @@ require parquet require iceberg - ### Scanning latest snapshot query I SELECT count(*) FROM ICEBERG_SCAN('data/persistent/iceberg/lineitem_iceberg', ALLOW_MOVED_PATHS=TRUE); @@ -21,13 +20,13 @@ SELECT count(*) FROM ICEBERG_SCAN('data/persistent/iceberg/lineitem_iceberg', AL # Scanning 1st snapshot query I -SELECT count(*) FROM ICEBERG_SCAN('data/persistent/iceberg/lineitem_iceberg', 3776207205136740581::UBIGINT, ALLOW_MOVED_PATHS=TRUE); +SELECT count(*) FROM ICEBERG_SCAN('data/persistent/iceberg/lineitem_iceberg', snapshot_from_id=3776207205136740581::UBIGINT, ALLOW_MOVED_PATHS=TRUE); ---- 60175 # Scanning 2nd snapshot query I -SELECT count(*) FROM ICEBERG_SCAN('data/persistent/iceberg/lineitem_iceberg', 7635660646343998149::UBIGINT, ALLOW_MOVED_PATHS=TRUE); +SELECT count(*) FROM ICEBERG_SCAN('data/persistent/iceberg/lineitem_iceberg', snapshot_from_id=7635660646343998149::UBIGINT, ALLOW_MOVED_PATHS=TRUE); ---- 51793 @@ -42,22 +41,22 @@ SELECT count(*) FROM ICEBERG_SCAN('data/persistent/iceberg/lineitem_iceberg', ve # 1 = 2023-02-15 15:07:54.504 # 2 = 2023-02-15 15:08:14.73 query I -SELECT count(*) FROM ICEBERG_SCAN('data/persistent/iceberg/lineitem_iceberg', '2023-02-15 15:07:54.504'::TIMESTAMP, ALLOW_MOVED_PATHS=TRUE); +SELECT count(*) FROM ICEBERG_SCAN('data/persistent/iceberg/lineitem_iceberg', snapshot_from_timestamp='2023-02-15 15:07:54.504'::TIMESTAMP, ALLOW_MOVED_PATHS=TRUE); ---- 60175 query I -SELECT count(*) FROM ICEBERG_SCAN('data/persistent/iceberg/lineitem_iceberg', '2023-02-15 15:07:54.729'::TIMESTAMP, ALLOW_MOVED_PATHS=TRUE); +SELECT count(*) FROM ICEBERG_SCAN('data/persistent/iceberg/lineitem_iceberg', snapshot_from_timestamp='2023-02-15 15:07:54.729'::TIMESTAMP, ALLOW_MOVED_PATHS=TRUE); ---- 60175 query I -SELECT count(*) FROM ICEBERG_SCAN('data/persistent/iceberg/lineitem_iceberg', '2023-02-15 15:08:14.73'::TIMESTAMP, ALLOW_MOVED_PATHS=TRUE); +SELECT count(*) FROM ICEBERG_SCAN('data/persistent/iceberg/lineitem_iceberg', snapshot_from_timestamp='2023-02-15 15:08:14.73'::TIMESTAMP, ALLOW_MOVED_PATHS=TRUE); ---- 51793 statement error -FROM ICEBERG_SCAN('data/persistent/iceberg/lineitem_iceberg', '2023-02-15 15:07:54.503'::TIMESTAMP, ALLOW_MOVED_PATHS=TRUE); +FROM ICEBERG_SCAN('data/persistent/iceberg/lineitem_iceberg', snapshot_from_timestamp='2023-02-15 15:07:54.503'::TIMESTAMP, ALLOW_MOVED_PATHS=TRUE); ---- IO Error: Could not find latest snapshots for timestamp 2023-02-15 15:07:54.503 @@ -90,22 +89,22 @@ statement ok SET unsafe_enable_version_guessing=true; query I -SELECT count(*) FROM ICEBERG_SCAN('data/persistent/iceberg/lineitem_iceberg_no_hint', '2023-02-15 15:07:54.504'::TIMESTAMP, ALLOW_MOVED_PATHS=TRUE); +SELECT count(*) FROM ICEBERG_SCAN('data/persistent/iceberg/lineitem_iceberg_no_hint', snapshot_from_timestamp='2023-02-15 15:07:54.504'::TIMESTAMP, ALLOW_MOVED_PATHS=TRUE); ---- 60175 query I -SELECT count(*) FROM ICEBERG_SCAN('data/persistent/iceberg/lineitem_iceberg_no_hint', '2023-02-15 15:07:54.729'::TIMESTAMP, ALLOW_MOVED_PATHS=TRUE); +SELECT count(*) FROM ICEBERG_SCAN('data/persistent/iceberg/lineitem_iceberg_no_hint', snapshot_from_timestamp='2023-02-15 15:07:54.729'::TIMESTAMP, ALLOW_MOVED_PATHS=TRUE); ---- 60175 query I -SELECT count(*) FROM ICEBERG_SCAN('data/persistent/iceberg/lineitem_iceberg_no_hint', '2023-02-15 15:08:14.73'::TIMESTAMP, ALLOW_MOVED_PATHS=TRUE); +SELECT count(*) FROM ICEBERG_SCAN('data/persistent/iceberg/lineitem_iceberg_no_hint', snapshot_from_timestamp='2023-02-15 15:08:14.73'::TIMESTAMP, ALLOW_MOVED_PATHS=TRUE); ---- 51793 statement error -FROM ICEBERG_SCAN('data/persistent/iceberg/lineitem_iceberg_no_hint', '2023-02-15 15:07:54.503'::TIMESTAMP, ALLOW_MOVED_PATHS=TRUE); +FROM ICEBERG_SCAN('data/persistent/iceberg/lineitem_iceberg_no_hint', snapshot_from_timestamp='2023-02-15 15:07:54.503'::TIMESTAMP, ALLOW_MOVED_PATHS=TRUE); ---- IO Error: Could not find latest snapshots for timestamp 2023-02-15 15:07:54.503 diff --git a/test/sql/local/iceberg_scan_generated_data_0_001.test b/test/sql/local/iceberg_scans/iceberg_scan_generated_data_0_001.test similarity index 98% rename from test/sql/local/iceberg_scan_generated_data_0_001.test rename to test/sql/local/iceberg_scans/iceberg_scan_generated_data_0_001.test index 1a3584f..400c24d 100644 --- a/test/sql/local/iceberg_scan_generated_data_0_001.test +++ b/test/sql/local/iceberg_scans/iceberg_scan_generated_data_0_001.test @@ -121,4 +121,4 @@ Binder Error query II EXPLAIN SELECT count(*) FROM ICEBERG_SCAN('data/generated/iceberg/spark-local/pyspark_iceberg_table_v2'); ---- -physical_plan :.* ANTI .*PARQUET_SCAN.*Rows.*Rows.* \ No newline at end of file +physical_plan :.*ICEBERG_SCAN.*Rows.* diff --git a/test/sql/local/iceberg_scan_generated_data_1.test_slow b/test/sql/local/iceberg_scans/iceberg_scan_generated_data_1.test_slow similarity index 100% rename from test/sql/local/iceberg_scan_generated_data_1.test_slow rename to test/sql/local/iceberg_scans/iceberg_scan_generated_data_1.test_slow diff --git a/test/sql/local/iceberg_snapshots.test b/test/sql/local/iceberg_scans/iceberg_snapshots.test similarity index 99% rename from test/sql/local/iceberg_snapshots.test rename to test/sql/local/iceberg_scans/iceberg_snapshots.test index 3fc61e1..4accb3c 100644 --- a/test/sql/local/iceberg_snapshots.test +++ b/test/sql/local/iceberg_scans/iceberg_snapshots.test @@ -10,6 +10,8 @@ Catalog Error require notwindows +require parquet + require iceberg query IIII diff --git a/test/sql/local/iceberg_scans/iceberge_read_deletes.test b/test/sql/local/iceberg_scans/iceberge_read_deletes.test new file mode 100644 index 0000000..3dc3462 --- /dev/null +++ b/test/sql/local/iceberg_scans/iceberge_read_deletes.test @@ -0,0 +1,167 @@ +# name: test/sql/local/iceberg_catalog_read.test +# description: test integration with iceberg catalog read +# group: [iceberg] + +require-env DUCKDB_ICEBERG_HAVE_GENERATED_DATA + +require parquet + +require iceberg + +# TODO verify the catalog has deletes (rest catalog stores data differently from local catalog) +query I nosort results_1 +select sum(l_suppkey), min(l_suppkey), max(l_suppkey) from ICEBERG_SCAN('data/generated/iceberg/spark-local/table_with_deletes'); + +query I nosort results_1 +select sum(l_suppkey), min(l_suppkey), max(l_suppkey) from read_parquet('data/generated/intermediates/spark-local/table_with_deletes/last/data.parquet/*.parquet'); + +# Verify parquet scans on tables with delete vectors do not mess with results. +query I nosort table_filter_result +select l_partkey from ICEBERG_SCAN('data/generated/iceberg/spark-local/lineitem_001_deletes') where l_partkey > 5 and l_partkey < 20000; +---- + +query I nosort table_filter_result +select l_partkey from read_parquet('data/generated/intermediates/spark-local/lineitem_001_deletes/last/data.parquet/*.parquet') where l_partkey > 5 and l_partkey < 20000; +---- + +# Verify Deletes +# joins with a table that has deletes. +# Projecting l_orderkey, joining on l_partkey +query I nosort join_results +select l1_deletes.l_partkey, count(*) count from + ICEBERG_SCAN('data/generated/iceberg/spark-local/lineitem_sf1_deletes') l1_deletes, + ICEBERG_SCAN('data/generated/iceberg/spark-local/lineitem_sf_01_no_deletes') l2_no_deletes +where l1_deletes.l_partkey = l2_no_deletes.l_partkey +group by l1_deletes.l_partkey +order by l1_deletes.l_partkey, count +---- + +query I nosort join_results +select l1_deletes.l_partkey, count(*) count from +ICEBERG_SCAN('data/generated/iceberg/spark-local/lineitem_sf1_deletes') l1_deletes, +ICEBERG_SCAN('data/generated/iceberg/spark-local/lineitem_sf_01_no_deletes') l2_no_deletes +where l1_deletes.l_partkey = l2_no_deletes.l_partkey +group by l1_deletes.l_partkey +order by l1_deletes.l_partkey, count +---- + + +# Verify Deletes +# joins with a table that has deletes. +# Projecting l_orderkey, joining on l_partkey +query I nosort join_results_2_orderkey +select l1_deletes.l_orderkey, count(*) count from + ICEBERG_SCAN('data/generated/iceberg/spark-local/lineitem_sf1_deletes') l1_deletes, + ICEBERG_SCAN('data/generated/iceberg/spark-local/lineitem_sf_01_no_deletes') l2_no_deletes +where l1_deletes.l_partkey = l2_no_deletes.l_partkey +group by l1_deletes.l_orderkey +order by l1_deletes.l_orderkey, count +---- + + +query I nosort join_results_2_orderkey +select l1_deletes.l_orderkey, count(*) count from +ICEBERG_SCAN('data/generated/iceberg/spark-local/lineitem_sf1_deletes') l1_deletes, +ICEBERG_SCAN('data/generated/iceberg/spark-local/lineitem_sf_01_no_deletes') l2_no_deletes +where l1_deletes.l_partkey = l2_no_deletes.l_partkey +group by l1_deletes.l_orderkey +order by l1_deletes.l_orderkey, count +---- + + +# Verify a single delete +query IIII nosort single_delete_result +select l_orderkey, + l_partkey, + l_suppkey, + l_quantity +from ICEBERG_SCAN('data/generated/iceberg/spark-local/lineitem_sf_01_1_delete') +order by l_partkey, l_orderkey limit 10; +---- + +query IIII nosort single_delete_result +select l_orderkey, + l_partkey, + l_suppkey, + l_quantity +from read_parquet('data/generated/intermediates/spark-rest/lineitem_sf_01_1_delete/last/data.parquet/*.parquet') +order by l_partkey, l_orderkey limit 10; +---- + +query I +select count(*) +from ICEBERG_SCAN('data/generated/iceberg/spark-local/lineitem_sf_01_1_delete') +where l_orderkey=10053 and l_partkey = 77; +---- +0 + +query I +select count(*) +from read_parquet('data/generated/intermediates/spark-rest/lineitem_sf_01_1_delete/last/data.parquet/*.parquet') +where l_orderkey=10053 and l_partkey = 77; +---- +0 + + +# verify paritioned table read +# add tests for partitioned tables. +query II nosort result_4_partitioned +select l_shipmode, count(*) count +from ICEBERG_SCAN('data/generated/iceberg/spark-local/lineitem_partitioned_l_shipmode') +group by l_shipmode order by count; +---- + +query II nosort result_4_partitioned +select l_shipmode, count(*) count +from read_parquet('data/generated/intermediates/spark-rest/lineitem_partitioned_l_shipmode/last/data.parquet/*.parquet') +group by l_shipmode order by count; +---- + +# verify paritioned table read with table filters +# add tests for partitioned tables. +query II nosort result_4_partitioned_table_filters +select l_shipmode, count(*) count +from ICEBERG_SCAN('data/generated/iceberg/spark-local/lineitem_partitioned_l_shipmode') +where l_partkey > 50 +group by l_shipmode order by count; +---- + +query II nosort result_4_partitioned_table_filters +select l_shipmode, count(*) count +from read_parquet('data/generated/intermediates/spark-rest/lineitem_partitioned_l_shipmode/last/data.parquet/*.parquet') +where l_partkey > 50 +group by l_shipmode order by count; +---- + +# verify delete from partitioned table +# create table lineitem_partitioned_mmany_deletes as select * from lineitem (merge on write, partition by l_shipmode) +# select count(*), l_shipmode from lineitem where l_linenumber in (3,4,5,6) group by l_shipmode ; +# add tests for partitioned tables. +query II nosort result_5 +select l_shipmode, count(*) count +from ICEBERG_SCAN('data/generated/iceberg/spark-local/lineitem_partitioned_l_shipmode_deletes') +group by l_shipmode order by count; +---- + +query II nosort result_5 +select l_shipmode, count(*) count +from read_parquet('data/generated/intermediates/spark-rest/lineitem_partitioned_l_shipmode_deletes/last/data.parquet/*.parquet') +group by l_shipmode order by count; +---- + +# verify select on partitioned table deletes with table_filters +query II nosort result_5_table_filter +select l_shipmode, count(*) count +from ICEBERG_SCAN('data/generated/iceberg/spark-local/lineitem_partitioned_l_shipmode_deletes') +where l_partkey > 100 +group by l_shipmode order by count; +---- + +query II nosort result_5_table_filter +select l_shipmode, count(*) count +from read_parquet('data/generated/intermediates/spark-rest/lineitem_partitioned_l_shipmode_deletes/last/data.parquet/*.parquet') +where l_partkey > 100 +group by l_shipmode order by count; +---- + + diff --git a/test/sql/local/iceberg_catalog_read.test b/test/sql/local/irc/iceberg_catalog_read.test similarity index 94% rename from test/sql/local/iceberg_catalog_read.test rename to test/sql/local/irc/iceberg_catalog_read.test index a9f17b1..e587b92 100644 --- a/test/sql/local/iceberg_catalog_read.test +++ b/test/sql/local/irc/iceberg_catalog_read.test @@ -4,10 +4,10 @@ require-env ICEBERG_SERVER_AVAILABLE -require iceberg - require parquet +require iceberg + require httpfs statement ok @@ -27,6 +27,7 @@ CREATE SECRET ( USE_SSL 0 ); + statement ok ATTACH '' AS my_datalake (TYPE ICEBERG); @@ -67,10 +68,11 @@ select * from my_datalake.default.table_more_deletes order by all; 2023-03-11 11 k 2023-03-12 12 l -statement error -select * from my_datalake.default.pyspark_iceberg_table_v2; + +query I +select sum(l_suppkey_long) from my_datalake.default.pyspark_iceberg_table_v2; ---- -:.*Not implemented Error.* +20352 statement error update my_datalake.default.table_unpartitioned set number = 5 where number < 5; diff --git a/test/sql/local/irc/irc_catalog_read_deletes.test b/test/sql/local/irc/irc_catalog_read_deletes.test new file mode 100644 index 0000000..880c3a0 --- /dev/null +++ b/test/sql/local/irc/irc_catalog_read_deletes.test @@ -0,0 +1,104 @@ +# name: test/sql/local/iceberg_catalog_read.test +# description: test integration with iceberg catalog read +# group: [iceberg] + +require-env ICEBERG_SERVER_AVAILABLE + +require parquet + +require iceberg + +require httpfs + +statement ok +CREATE SECRET ( + TYPE ICEBERG, + ENDPOINT 'http://127.0.0.1:8181' + ); + + +statement ok +CREATE SECRET ( + TYPE S3, + KEY_ID 'admin', + SECRET 'password', + ENDPOINT '127.0.0.1:9000', + URL_STYLE 'path', + USE_SSL 0 + ); + + +statement ok +ATTACH '' AS my_datalake (TYPE ICEBERG); + +# TODO verify the catalog has deletes (rest catalog stores data differently from local catalog) +query I nosort results_1 +select sum(l_suppkey), min(l_suppkey), max(l_suppkey) from my_datalake.default.lineitem_001_deletes; + +query I nosort results_1 +select sum(l_suppkey), min(l_suppkey), max(l_suppkey) from read_parquet('data/generated/intermediates/spark-rest/lineitem_001_deletes/last/data.parquet/*.parquet'); + +# Verify Deletes +# joins with a table that has deletes. +query I nosort results_2 +select l1_deletes.l_orderkey, count(*) count from + my_datalake.default.lineitem_sf1_deletes l1_deletes, + my_datalake.default.lineitem_sf_01_no_deletes l2_no_deletes +where l1_deletes.l_partkey = l2_no_deletes.l_partkey +group by l1_deletes.l_orderkey +order by l1_deletes.l_orderkey, count +limit 10; +---- + + +query I nosort results_2 +select l1_deletes.l_orderkey, count(*) count from + read_parquet('data/generated/intermediates/spark-rest/lineitem_sf1_deletes/last/data.parquet/*.parquet') l1_deletes, + read_parquet('data/generated/intermediates/spark-rest/lineitem_sf_01_no_deletes/last/data.parquet/*.parquet') l2_no_deletes +where l1_deletes.l_partkey = l2_no_deletes.l_partkey +group by l1_deletes.l_orderkey +order by l1_deletes.l_orderkey, count +limit 10 +; + +# Verify a single delete +query IIII nosort result_3 +select l_orderkey, l_partkey, l_suppkey, l_quantity from my_datalake.default.lineitem_sf_01_1_delete order by l_partkey, l_orderkey limit 10; +---- + +query IIII nosort result_3 +select l_orderkey, l_partkey, l_suppkey, l_quantity from read_parquet('data/generated/intermediates/spark-rest/lineitem_sf_01_1_delete/last/data.parquet/*.parquet') order by l_partkey, l_orderkey limit 10; +---- + +query I +select count(*) from my_datalake.default.lineitem_sf_01_1_delete where l_orderkey=10053 and l_partkey = 77; +---- +0 + +query I +select count(*) from read_parquet('data/generated/intermediates/spark-rest/lineitem_sf_01_1_delete/last/data.parquet/*.parquet') where l_orderkey=10053 and l_partkey = 77; +---- +0 + + +# Verify reading from large partitioned table +# add tests for partitioned tables. +query II nosort result_4 +select l_shipmode, count(*) count from my_datalake.default.lineitem_partitioned_l_shipmode group by l_shipmode order by count; +---- + +query II nosort result_4 +select l_shipmode, count(*) count from read_parquet('data/generated/intermediates/spark-rest/lineitem_partitioned_l_shipmode/last/data.parquet/*.parquet') group by l_shipmode order by count; +---- + + +# Verify reading from large partitioned table with deletes +query II nosort result_5 +select l_shipmode, count(*) count from my_datalake.default.lineitem_partitioned_l_shipmode_deletes group by l_shipmode order by count; +---- + +query II nosort result_5 +select l_shipmode, count(*) count from read_parquet('data/generated/intermediates/spark-rest/lineitem_partitioned_l_shipmode_deletes/last/data.parquet/*.parquet') group by l_shipmode order by count; +---- + +