Skip to content

Commit

Permalink
Merge pull request #98 from Tmonster/add_iceberg_catalog_read_support
Browse files Browse the repository at this point in the history
Add iceberg catalog read support
  • Loading branch information
samansmink authored Feb 12, 2025
2 parents 43b4e37 + 85950a2 commit 5756233
Show file tree
Hide file tree
Showing 36 changed files with 2,228 additions and 57 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/MainDistributionPipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,21 @@ concurrency:
jobs:
duckdb-stable-build:
name: Build extension binaries
uses: duckdb/extension-ci-tools/.github/workflows/_extension_distribution.yml@main
uses: duckdb/extension-ci-tools/.github/workflows/_extension_distribution.yml@v1.2.0
with:
extension_name: iceberg
duckdb_version: main
ci_tools_version: main
duckdb_version: v1.2.0
ci_tools_version: v1.2.0
exclude_archs: 'wasm_mvp;wasm_eh;wasm_threads;windows_amd64_rtools;windows_amd64_mingw'

duckdb-stable-deploy:
name: Deploy extension binaries
needs: duckdb-stable-build
uses: duckdb/extension-ci-tools/.github/workflows/_extension_deploy.yml@main
uses: duckdb/extension-ci-tools/.github/workflows/_extension_deploy.yml@v1.2.0
secrets: inherit
with:
extension_name: iceberg
duckdb_version: main
ci_tools_version: main
duckdb_version: v1.2.0
ci_tools_version: v1.2.0
exclude_archs: 'wasm_mvp;wasm_eh;wasm_threads;windows_amd64_rtools;windows_amd64_mingw'
deploy_latest: ${{ startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main' }}
6 changes: 6 additions & 0 deletions .github/workflows/Rest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,9 @@ jobs:
working-directory: scripts/
run: |
./start-rest-catalog.sh
- name: Test With rest catalog
env:
ICEBERG_SERVER_AVAILABLE: 1
run: |
make test_release
20 changes: 17 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,24 @@ include_directories(src/include)
set(EXTENSION_SOURCES
src/iceberg_extension.cpp
src/iceberg_functions.cpp
src/catalog_api.cpp
src/catalog_utils.cpp
src/common/utils.cpp
src/common/schema.cpp
src/common/iceberg.cpp
src/iceberg_functions/iceberg_snapshots.cpp
src/iceberg_functions/iceberg_scan.cpp
src/iceberg_functions/iceberg_metadata.cpp)
src/iceberg_functions/iceberg_metadata.cpp
src/storage/irc_catalog.cpp
src/storage/irc_catalog_set.cpp
src/storage/irc_clear_cache.cpp
src/storage/irc_schema_entry.cpp
src/storage/irc_schema_set.cpp
src/storage/irc_table_entry.cpp
src/storage/irc_table_set.cpp
src/storage/irc_transaction.cpp
src/storage/irc_transaction_manager.cpp
)

add_library(${EXTENSION_NAME} STATIC ${EXTENSION_SOURCES})

Expand Down Expand Up @@ -73,10 +85,12 @@ target_link_libraries(
Snappy::snappy
ZLIB::ZLIB)

find_package(CURL REQUIRED)

# Link dependencies into extension
target_link_libraries(${EXTENSION_NAME} PUBLIC optimized avro_static_release
target_link_libraries(${EXTENSION_NAME} PUBLIC optimized avro_static_release CURL::libcurl
debug avro_static_debug)
target_link_libraries(${TARGET_NAME}_loadable_extension optimized
target_link_libraries(${TARGET_NAME}_loadable_extension optimized CURL::libcurl
avro_static_release debug avro_static_debug)

install(
Expand Down
95 changes: 93 additions & 2 deletions scripts/provision.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@
from pyspark.sql import SparkSession

import os
import duckdb

in_scripts_dir = os.path.basename(os.path.dirname(__file__)) == 'scripts'
if not in_scripts_dir:
print("please run provision.py from duckdb-iceberg/scripts dir")
exit(1)


os.environ[
"PYSPARK_SUBMIT_ARGS"
Expand All @@ -40,7 +47,10 @@
.config("spark.sql.catalog.demo.s3.endpoint", "http://127.0.0.1:9000")
.config("spark.sql.catalog.demo.s3.path-style-access", "true")
.config("spark.sql.defaultCatalog", "demo")
.config('spark.driver.memory', '10g')
.config("spark.sql.catalogImplementation", "in-memory")
.config('spark.jars', f'test_data_generator/iceberg-spark-runtime-3.5_2.12-1.4.2.jar')
.config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
.getOrCreate()
)

Expand All @@ -57,10 +67,11 @@
number integer,
letter string
)
USING iceberg
USING iceberg
"""
)


spark.sql(
"""
INSERT INTO default.table_unpartitioned
Expand Down Expand Up @@ -132,7 +143,6 @@
"""
)


spark.sql(
"""
INSERT INTO default.table_mor_deletes
Expand All @@ -151,3 +161,84 @@
(CAST('2023-03-12' AS date), 12, 'l');
"""
)

spark.sql(
"""
Delete from default.table_mor_deletes
where number > 3 and number < 10;
"""
)

# TODO find better script to generate deletes in iceberg
CWD=".."
DEST_PATH='data/iceberg/generated_spec1_0_001'
os.system(f"python3 test_data_generator/generate_base_parquet.py 001 {CWD}/{DEST_PATH} spark")
location = "../data/iceberg/generated_spec1_0_001/base_file/file.parquet"
spark.read.parquet(location).createOrReplaceTempView('parquet_lineitem_view');

spark.sql(
"""
CREATE OR REPLACE TABLE default.pyspark_iceberg_table
USING ICEBERG
TBLPROPERTIES (
'format-version'='2',
'write.update.mode'='merge-on-read'
)
As select * from parquet_lineitem_view
"""
)

spark.sql("""
update default.pyspark_iceberg_table
set l_orderkey_bool=NULL,
l_partkey_int=NULL,
l_suppkey_long=NULL,
l_extendedprice_float=NULL,
l_extendedprice_double=NULL,
l_shipdate_date=NULL,
l_partkey_time=NULL,
l_commitdate_timestamp=NULL,
l_commitdate_timestamp_tz=NULL,
l_comment_string=NULL,
l_comment_blob=NULL
where l_partkey_int % 2 = 0;""")

spark.sql("""
insert into default.pyspark_iceberg_table
select * FROM default.pyspark_iceberg_table
where l_extendedprice_double < 30000
""")

spark.sql("""
update default.pyspark_iceberg_table
set l_orderkey_bool = not l_orderkey_bool;
""")


spark.sql("""
delete
from default.pyspark_iceberg_table
where l_extendedprice_double < 10000;
""")

spark.sql("""
delete
from default.pyspark_iceberg_table
where l_extendedprice_double > 70000;
""")

spark.sql("""
ALTER TABLE default.pyspark_iceberg_table
ADD COLUMN schema_evol_added_col_1 INT DEFAULT 42;
""")

spark.sql("""
UPDATE default.pyspark_iceberg_table
SET schema_evol_added_col_1 = l_partkey_int
WHERE l_partkey_int % 5 = 0;
""")

spark.sql("""
ALTER TABLE default.pyspark_iceberg_table
ALTER COLUMN schema_evol_added_col_1 TYPE BIGINT;
""")
3 changes: 2 additions & 1 deletion scripts/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pyspark==3.4.1
pyspark==3.5.0
duckdb
Loading

0 comments on commit 5756233

Please sign in to comment.