Skip to content

Commit

Permalink
finally have delete vectors working consistently in a rest catalog. I…
Browse files Browse the repository at this point in the history
… will clean them up though
  • Loading branch information
Tmonster committed Feb 12, 2025
1 parent 0d5712a commit c5d7790
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 17 deletions.
86 changes: 71 additions & 15 deletions scripts/provision.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@
import os
import duckdb

in_scripts_dir = os.path.basename(os.path.dirname(__file__)) == 'scripts'
if not in_scripts_dir:
print("please run provision.py from duckdb-iceberg/scripts dir")
exit(1)


os.environ[
"PYSPARK_SUBMIT_ARGS"
] = "--packages org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.2,org.apache.iceberg:iceberg-aws-bundle:1.4.2 pyspark-shell"
Expand All @@ -43,7 +49,7 @@
.config("spark.sql.defaultCatalog", "demo")
.config('spark.driver.memory', '10g')
.config("spark.sql.catalogImplementation", "in-memory")
.config('spark.jars', f'scripts/test_data_generator/iceberg-spark-runtime-3.5_2.12-1.4.2.jar')
.config('spark.jars', f'test_data_generator/iceberg-spark-runtime-3.5_2.12-1.4.2.jar')
.config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
.getOrCreate()
)
Expand All @@ -61,10 +67,11 @@
number integer,
letter string
)
USING iceberg
USING iceberg
"""
)


spark.sql(
"""
INSERT INTO default.table_unpartitioned
Expand Down Expand Up @@ -136,7 +143,6 @@
"""
)


spark.sql(
"""
INSERT INTO default.table_mor_deletes
Expand All @@ -163,26 +169,76 @@
"""
)

con = duckdb.connect()
con.query("INSTALL tpch")
con.query("LOAD tpch")
con.query(f"SELECT setseed(0.42);")
con.query(f"CALL dbgen(sf=1);")
location = "scripts/lineitem.parquet"
os.makedirs(os.path.dirname(location), exist_ok=True)
con.query(f"COPY lineitem TO '{location}' (FORMAT PARQUET)");

# TODO find better script to generate deletes in iceberg
CWD=".."
DEST_PATH='data/iceberg/generated_spec1_0_001'
os.system(f"python3 test_data_generator/generate_base_parquet.py 001 {CWD}/{DEST_PATH} spark")
location = "../data/iceberg/generated_spec1_0_001/base_file/file.parquet"
spark.read.parquet(location).createOrReplaceTempView('parquet_lineitem_view');

spark.sql(
"""
CREATE OR REPLACE TABLE default.lineitem
CREATE OR REPLACE TABLE default.pyspark_iceberg_table
USING ICEBERG
TBLPROPERTIES (
'format-version'='2'
'format-version'='2',
'write.update.mode'='merge-on-read'
)
As select * from parquet_lineitem_view
"""
)

spark.sql("update default.lineitem set l_orderkey=5")
spark.sql("""
update default.pyspark_iceberg_table
set l_orderkey_bool=NULL,
l_partkey_int=NULL,
l_suppkey_long=NULL,
l_extendedprice_float=NULL,
l_extendedprice_double=NULL,
l_shipdate_date=NULL,
l_partkey_time=NULL,
l_commitdate_timestamp=NULL,
l_commitdate_timestamp_tz=NULL,
l_comment_string=NULL,
l_comment_blob=NULL
where l_partkey_int % 2 = 0;""")

spark.sql("""
insert into default.pyspark_iceberg_table
select * FROM default.pyspark_iceberg_table
where l_extendedprice_double < 30000
""")

spark.sql("""
update default.pyspark_iceberg_table
set l_orderkey_bool = not l_orderkey_bool;
""")


spark.sql("""
delete
from default.pyspark_iceberg_table
where l_extendedprice_double < 10000;
""")

spark.sql("""
delete
from default.pyspark_iceberg_table
where l_extendedprice_double > 70000;
""")

spark.sql("""
ALTER TABLE default.pyspark_iceberg_table
ADD COLUMN schema_evol_added_col_1 INT DEFAULT 42;
""")

spark.sql("""
UPDATE default.pyspark_iceberg_table
SET schema_evol_added_col_1 = l_partkey_int
WHERE l_partkey_int % 5 = 0;
""")

spark.sql("""
ALTER TABLE default.pyspark_iceberg_table
ALTER COLUMN schema_evol_added_col_1 TYPE BIGINT;
""")
4 changes: 2 additions & 2 deletions test/sql/local/iceberg_catalog_read.test
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ ATTACH '' AS my_datalake (TYPE ICEBERG);
query IIIIII
Show all tables;
----
my_datalake default lineitem [___] [INTEGER] false
my_datalake default pyspark_iceberg_table [__] [INTEGER] false
my_datalake default table_mor_deletes [__] [INTEGER] false
my_datalake default table_partitioned [__] [INTEGER] false
my_datalake default table_unpartitioned [__] [INTEGER] false
Expand Down Expand Up @@ -76,7 +76,7 @@ select * from my_datalake.default.table_mor_deletes order by all;
2023-03-12 12 l

statement error
select * from my_datalake.default.lineitem;
select * from my_datalake.default.pyspark_iceberg_table;
----
<REGEX>:.*Not implemented Error.*

Expand Down

0 comments on commit c5d7790

Please sign in to comment.