diff --git a/.gitignore b/.gitignore index cb608ef..0d2864b 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,5 @@ data/iceberg/generated_* scripts/metastore_db/ scripts/derby.log scripts/test-script-with-path.sql +*.parquet +src/include/jiceberg_generated/ diff --git a/.gitmodules b/.gitmodules index dd490ea..b06efe8 100644 --- a/.gitmodules +++ b/.gitmodules @@ -5,3 +5,6 @@ [submodule "extension-ci-tools"] path = extension-ci-tools url = https://github.com/duckdb/extension-ci-tools.git +[submodule "arrow"] + path = arrow + url = https://github.com/apache/arrow.git diff --git a/CMakeLists.txt b/CMakeLists.txt index fe79363..6c4c1ae 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -4,21 +4,38 @@ cmake_minimum_required(VERSION 2.8.12) set(TARGET_NAME iceberg) project(${TARGET_NAME}) -set(CMAKE_CXX_STANDARD 14) +set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD_REQUIRED True) set(EXTENSION_NAME ${TARGET_NAME}_extension) -include_directories(src/include) +include_directories( + src/include + arrow/cpp/src + arrow/cpp/build/src +) set(EXTENSION_SOURCES src/iceberg_extension.cpp src/iceberg_functions.cpp + src/catalog_api.cpp + src/catalog_utils.cpp + src/ic_create_table_as_op.cpp src/common/utils.cpp src/common/schema.cpp src/common/iceberg.cpp src/iceberg_functions/iceberg_snapshots.cpp src/iceberg_functions/iceberg_scan.cpp - src/iceberg_functions/iceberg_metadata.cpp) + src/iceberg_functions/iceberg_metadata.cpp + src/storage/ic_catalog.cpp + src/storage/ic_catalog_set.cpp + src/storage/ic_clear_cache.cpp + src/storage/ic_schema_entry.cpp + src/storage/ic_schema_set.cpp + src/storage/ic_table_entry.cpp + src/storage/ic_table_set.cpp + src/storage/ic_transaction.cpp + src/storage/ic_transaction_manager.cpp +) add_library(${EXTENSION_NAME} STATIC ${EXTENSION_SOURCES}) @@ -73,11 +90,20 @@ target_link_libraries( Snappy::snappy ZLIB::ZLIB) +find_package(CURL REQUIRED) + +find_library(ARROW_LIB arrow PATHS arrow/cpp/build/release) +find_library(PARQUET_LIB parquet PATHS arrow/cpp/build/release) +find_library(FTICEBERGHELPER_LIB jiceberg PATHS jiceberg_lib/app/build/native/nativeCompile) + # Link dependencies into extension -target_link_libraries(${EXTENSION_NAME} PUBLIC optimized avro_static_release - debug avro_static_debug) -target_link_libraries(${TARGET_NAME}_loadable_extension optimized - avro_static_release debug avro_static_debug) +target_link_libraries(${EXTENSION_NAME} PUBLIC optimized avro_static_release + debug avro_static_debug + ${FTICEBERGHELPER_LIB} CURL::libcurl + ${ARROW_LIB} ${PARQUET_LIB}) +target_link_libraries(${TARGET_NAME}_loadable_extension optimized CURL::libcurl + avro_static_release ${FTICEBERGHELPER_LIB} ${ARROW_LIB} ${PARQUET_LIB} + debug avro_static_debug) install( TARGETS ${EXTENSION_NAME} ${TARGET_NAME}_loadable_extension diff --git a/README-ADDENDUM.md b/README-ADDENDUM.md new file mode 100644 index 0000000..f992200 --- /dev/null +++ b/README-ADDENDUM.md @@ -0,0 +1,127 @@ +# ADDENDUM + +This fork adds proof-of-concept functionality to DuckDB iceberg extension to be able to connect to an iceberg catalog and write to iceberg tables as well as read from them. + +# Requirements +You will need the following to be able to use this new version of the extension: +1. DuckDB version 1.2.0 or later +2. `httpfs` extension + +Since this extension is not official yet, you will need to run duckdb in `unsigned` mode to be able to use it: +```bash +duckdb --unsigned +``` + +# Installation +The following steps need to be done once: +1. Download the zip from github and unzip it +2. Change directory to the directory where you unzipped the files +3. Install the extension +```sql +INSTALL './iceberg.duckdb_extension'; +``` +4. If you already have the official `iceberg` extension installed, you will need to force the install +```sql +FORCE INSTALL './iceberg.duckdb_extension'; +``` +5. Install `httpfs` extension if you don't have it installed already +```sql +INSTALL httpfs; +``` + +# Usage +## Load `httpfs` and `iceberg` extensions +```sql +LOAD httpfs; +LOAD iceberg; +``` +## Create a secret to provide access to an iceberg catalog +```sql +CREATE SECRET ( + TYPE ICEBERG, + CLIENT_ID '${CLIENT_ID}', + CLIENT_SECRET '${CLIENT_SECRET}', + ENDPOINT '${ENDPOINT}', + AWS_REGION '${AWS_REGION}' +) +``` + +## Attach an iceberg catalog +```sql +ATTACH 'my_catalog' AS my_catalog (TYPE ICEBERG) +``` + +## Read an iceberg table +```sql +SELECT * FROM my_catalog.my_schema.table_1; +``` + +## Create a new iceberg table +```sql +CREATE TABLE my_catalog.my_schema.new_table (id BIGINT, name VARCHAR); +``` +```sql +CREATE TABLE my_catalog.my_schema.new_table_2 AS (SELECT FROM version() as "version"); +``` + +## Delete an existing iceberg table +```sql +DROP TABLE my_catalog.my_schema.table_1; +``` + +# How to build extension from source +Requirements: +* A compiler that supports C++17 +* CMake version 3.28 or later +``` +git clone https://github.com/fivetran/duckdb-iceberg.git +git submodule update --init --recursive +brew install ninja +GEN=ninja make {debug/release} +``` + +# Roadmap +## 1. Supported SQL commands +### ✅ CREATE SECRET +### ✅ ATTACH +### 🔳 USE +### ✅ SELECT +### ✅ CREATE SCHEMA +### ✅ DROP SCHEMA +### 🔳 CREATE VIEW +### 🔳 DROP VIEW +### ✅ CREATE TABLE +### ✅ CREATE TABLE AS SELECT +### 🔳 ALTER TABLE +### ✅ DROP TABLE +### 🔳 INSERT +### 🔳 UPDATE +### 🔳 DELETE + +## 2. Supported [Iceberg data types](https://docs.snowflake.com/en/user-guide/tables-iceberg-data-types) (writing) +### 🔳 boolean +### ✅ string +### 🔳 tinyint +### 🔳 smallint +### ✅ int +### ✅ long +### ✅ double +### 🔳 float +### 🔳 timestamp +### 🔳 timestamptz +### 🔳 binary +### 🔳 date +### 🔳 decimal(prec,scale) +### 🔳 array +### 🔳 map +### 🔳 struct + +## 3. Miscellaneous +### 🔳 Bundle `jiceberg` statically into the extension + + + + + + + diff --git a/arrow b/arrow new file mode 160000 index 0000000..f872372 --- /dev/null +++ b/arrow @@ -0,0 +1 @@ +Subproject commit f8723722341df31c0091c91ec451319ded58c214 diff --git a/duckdb b/duckdb index f99785b..5f5512b 160000 --- a/duckdb +++ b/duckdb @@ -1 +1 @@ -Subproject commit f99785b78ae4724b31d9b41435ad8c17e57ee8f4 +Subproject commit 5f5512b827df6397afd31daedb4bbdee76520019 diff --git a/jiceberg_lib/.gitattributes b/jiceberg_lib/.gitattributes new file mode 100644 index 0000000..f91f646 --- /dev/null +++ b/jiceberg_lib/.gitattributes @@ -0,0 +1,12 @@ +# +# https://help.github.com/articles/dealing-with-line-endings/ +# +# Linux start script should use lf +/gradlew text eol=lf + +# These are Windows script files and should use crlf +*.bat text eol=crlf + +# Binary files should be left untouched +*.jar binary + diff --git a/jiceberg_lib/.gitignore b/jiceberg_lib/.gitignore new file mode 100644 index 0000000..45c993a --- /dev/null +++ b/jiceberg_lib/.gitignore @@ -0,0 +1,6 @@ +# Ignore Gradle project-specific cache directory +.gradle/ +.idea/ + +# Ignore Gradle build output directory +build/ diff --git a/jiceberg_lib/README.md b/jiceberg_lib/README.md new file mode 100644 index 0000000..19ecfe7 --- /dev/null +++ b/jiceberg_lib/README.md @@ -0,0 +1,19 @@ +Apache Iceberg does not have a library written in C++. This project uses the Java library +to create helper methods to be used in C++. + +# Instructions +* Install [SDKMAN](https://sdkman.io/install/) +* Install GraalVM CE 21 +```build +sdk install java 21.0.2-graalce +``` +* Build library +``` +gradle nativeBuild +``` +* Copy header files +``` +./scripts/copy_headers.sh +``` +* In `src/include/jiceberg_generated`, replace includes with "quotes" in the header files starting with `libjiceberg...` + diff --git a/jiceberg_lib/app/build.gradle b/jiceberg_lib/app/build.gradle new file mode 100644 index 0000000..6484007 --- /dev/null +++ b/jiceberg_lib/app/build.gradle @@ -0,0 +1,75 @@ +/* + * This file was generated by the Gradle 'init' task. + * + * This generated file contains a sample Java application project to get you started. + * For more details on building Java & JVM projects, please refer to https://docs.gradle.org/8.9/userguide/building_java_projects.html in the Gradle documentation. + */ + +plugins { + id 'java' + id 'application' + id 'org.graalvm.buildtools.native' version '0.9.28' +} + +repositories { + mavenCentral() +} + +dependencies { + implementation 'org.apache.iceberg:iceberg-core:1.7.1' + implementation 'org.apache.iceberg:iceberg-api:1.7.1' + implementation 'org.apache.iceberg:iceberg-aws:1.7.1' + implementation 'org.apache.iceberg:iceberg-parquet:1.7.1' + implementation 'org.apache.iceberg:iceberg-data:1.7.1' + implementation 'org.apache.parquet:parquet-avro:1.12.3' + implementation 'org.apache.hadoop:hadoop-common:3.3.1' + + implementation 'software.amazon.awssdk:s3:2.30.11' + implementation 'software.amazon.awssdk:sts:2.30.11' + implementation 'software.amazon.awssdk:auth:2.30.11' + implementation 'software.amazon.awssdk:regions:2.30.11' + + compileOnly 'org.graalvm.sdk:graal-sdk:22.0.0' + implementation 'org.slf4j:slf4j-nop:2.0.9' + implementation 'org.slf4j:log4j-over-slf4j:2.0.9' +} + +application { + mainClass = 'com.fivetran.iceberg.App' // Update with your package and class name +} + +graalvmNative { + binaries { + main { + imageName = 'libjiceberg' // Name of the native executable + mainClass = 'com.fivetran.iceberg.App' // Update with your package and class name + fallback = false + buildArgs.add('--shared') + buildArgs.add('--initialize-at-run-time=org.apache.log4j') + buildArgs.add('--initialize-at-run-time=org.slf4j') + buildArgs.add('--initialize-at-run-time=com.github.benmanes.caffeine.cache.LocalLoadingCache') + buildArgs.add('--initialize-at-run-time=org.apache.iceberg.util.Pair') + buildArgs.add('--initialize-at-run-time=org.apache.logging.log4j.util.ProviderUtil') + buildArgs.add('--initialize-at-run-time=org.apache.logging.log4j.core.impl.Log4jContextFactory') + buildArgs.add('--initialize-at-run-time=org.apache.commons.logging.LogFactory') + buildArgs.add('--initialize-at-build-time=org.slf4j,org.apache.logging.slf4j') + buildArgs.add('-H:+ReportExceptionStackTraces') + buildArgs.add('--enable-https') + + metadataRepository { + enabled = true + } + configurationFileDirectories.from(file('src/main/resources/META-INF/native-image-config')) + } + } +} + +def void main(Closure booleanClosure) {} + +tasks.register('runWithAgent', JavaExec) { + classpath = sourceSets.main.runtimeClasspath + mainClass = 'com.fivetran.iceberg.App' + jvmArgs = [ + '-agentlib:native-image-agent=config-output-dir=src/main/resources/META-INF/native-image-config' + ] +} \ No newline at end of file diff --git a/jiceberg_lib/app/src/main/java/com/fivetran/iceberg/App.java b/jiceberg_lib/app/src/main/java/com/fivetran/iceberg/App.java new file mode 100644 index 0000000..f69e56d --- /dev/null +++ b/jiceberg_lib/app/src/main/java/com/fivetran/iceberg/App.java @@ -0,0 +1,136 @@ +package com.fivetran.iceberg; + +import org.apache.iceberg.*; +import org.apache.iceberg.io.*; +import org.apache.iceberg.rest.RESTCatalog; +import org.apache.iceberg.catalog.TableIdentifier; + +import org.graalvm.nativeimage.IsolateThread; +import org.graalvm.nativeimage.c.function.CEntryPoint; +import org.graalvm.nativeimage.c.type.CCharPointer; +import org.graalvm.nativeimage.c.type.CTypeConversion; +import org.graalvm.word.WordFactory; + +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + + +public class App { + @CEntryPoint(name = "append_to_table") + public static CCharPointer appendToTable( + IsolateThread thread, + CCharPointer uri, + CCharPointer creds, + CCharPointer warehouse, + CCharPointer schemaName, + CCharPointer tableName, + CCharPointer filename, + int numRecords) { + try { + String newMetadataLoc = append( + CTypeConversion.toJavaString(uri), + CTypeConversion.toJavaString(creds), + CTypeConversion.toJavaString(warehouse), + CTypeConversion.toJavaString(schemaName), + CTypeConversion.toJavaString(tableName), + CTypeConversion.toJavaString(filename), + numRecords); + try (CTypeConversion.CCharPointerHolder holder = CTypeConversion.toCString(newMetadataLoc)) { + return holder.get(); + } + } catch (IOException e) { + return WordFactory.nullPointer(); + } finally { + // Delete the local file no matter what + Path localPath = Paths.get(CTypeConversion.toJavaString(filename)); + try { + java.nio.file.Files.deleteIfExists(localPath); + } catch (IOException e) { + System.out.println("Unable to delete datafile"); + } + } + } + + public static void main(String[] args) { + try { + System.out.println( + append("https://polaris.fivetran.com/api/catalog", + "client_id:client_secret", + "catalog", + "schema", + "new_table", + "datafile.parquet", + 1 + )); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + static String append( + String uri, + String creds, + String warehouse, + String schemaName, + String tableName, + String filename, + int numRecords) throws IOException { + Map properties = new HashMap<>(); + properties.put(CatalogProperties.URI, uri); + properties.put("credential", creds); + properties.put("oauth2-server-uri", uri + "/v1/oauth/tokens"); + properties.put("scope", "PRINCIPAL_ROLE:ALL"); + properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse); + properties.put(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.aws.s3.S3FileIO"); + + try(RESTCatalog catalog = new RESTCatalog()) { + catalog.initialize("catalog", properties); + TableIdentifier tableId = TableIdentifier.of(schemaName, tableName); + Table table = catalog.loadTable(tableId); + + // Copy the local file to the S3-backed OutputFile + Path localPath = Paths.get(filename); + try(FileIO fileIO = table.io()) { + OutputFile outputFile = fileIO.newOutputFile( + table.location() + "/data/" + filename); + try (PositionOutputStream out = outputFile.createOrOverwrite()) { + // Use Java's Files.copy to transfer the contents + java.nio.file.Files.copy(localPath, out); + } + + PartitionSpec spec = table.spec(); + DataFile dataFile = DataFiles.builder(spec) + .withRecordCount(numRecords) + .withFileSizeInBytes(outputFile.toInputFile().getLength()) + .withInputFile(outputFile.toInputFile()) + .withFormat(FileFormat.PARQUET) + .build(); + + // Force a new metadata commit + table.updateProperties() + .set("last-modified-time", String.valueOf(System.currentTimeMillis())) + .commit(); + + // Optionally, you can try to expire old metadata + table.expireSnapshots() + .expireOlderThan(System.currentTimeMillis()) + .commit(); + + // Append the data file + table.newAppend() + .appendFile(dataFile) + .set("write.format.default", "parquet") + .set("format-version", "2") + .set("write.metadata.compression-codec", "none") + .set("snapshot-id", UUID.randomUUID().toString()) // Force new snapshot + .commit(); + + return ((BaseTable) table).operations().current().metadataFileLocation(); + } + } + } +} diff --git a/jiceberg_lib/app/src/main/resources/META-INF/native-image-config/jni-config.json b/jiceberg_lib/app/src/main/resources/META-INF/native-image-config/jni-config.json new file mode 100644 index 0000000..75f6460 --- /dev/null +++ b/jiceberg_lib/app/src/main/resources/META-INF/native-image-config/jni-config.json @@ -0,0 +1,18 @@ +[ +{ + "name":"com.fivetran.iceberg.App", + "methods":[{"name":"main","parameterTypes":["java.lang.String[]"] }] +}, +{ + "name":"java.lang.Boolean", + "methods":[{"name":"getBoolean","parameterTypes":["java.lang.String"] }] +}, +{ + "name":"java.lang.String", + "methods":[{"name":"lastIndexOf","parameterTypes":["int"] }, {"name":"substring","parameterTypes":["int"] }] +}, +{ + "name":"java.lang.System", + "methods":[{"name":"getProperty","parameterTypes":["java.lang.String"] }, {"name":"setProperty","parameterTypes":["java.lang.String","java.lang.String"] }] +} +] diff --git a/jiceberg_lib/app/src/main/resources/META-INF/native-image-config/predefined-classes-config.json b/jiceberg_lib/app/src/main/resources/META-INF/native-image-config/predefined-classes-config.json new file mode 100644 index 0000000..0e79b2c --- /dev/null +++ b/jiceberg_lib/app/src/main/resources/META-INF/native-image-config/predefined-classes-config.json @@ -0,0 +1,8 @@ +[ + { + "type":"agent-extracted", + "classes":[ + ] + } +] + diff --git a/jiceberg_lib/app/src/main/resources/META-INF/native-image-config/proxy-config.json b/jiceberg_lib/app/src/main/resources/META-INF/native-image-config/proxy-config.json new file mode 100644 index 0000000..0d4f101 --- /dev/null +++ b/jiceberg_lib/app/src/main/resources/META-INF/native-image-config/proxy-config.json @@ -0,0 +1,2 @@ +[ +] diff --git a/jiceberg_lib/app/src/main/resources/META-INF/native-image-config/reflect-config.json b/jiceberg_lib/app/src/main/resources/META-INF/native-image-config/reflect-config.json new file mode 100644 index 0000000..7fd21c4 --- /dev/null +++ b/jiceberg_lib/app/src/main/resources/META-INF/native-image-config/reflect-config.json @@ -0,0 +1,443 @@ +[ +{ + "name":"[B" +}, +{ + "name":"[Lcom.fasterxml.jackson.databind.deser.Deserializers;" +}, +{ + "name":"[Lcom.fasterxml.jackson.databind.ser.Serializers;" +}, +{ + "name":"[Ljava.lang.String;" +}, +{ + "name":"[Ljava.util.List;" +}, +{ + "name":"[Lorg.apache.avro.util.springframework.ConcurrentReferenceHashMap$Segment;" +}, +{ + "name":"[Lorg.apache.iceberg.avro.ValueWriter;" +}, +{ + "name":"[Lsun.security.pkcs.SignerInfo;" +}, +{ + "name":"apple.security.AppleProvider", + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"com.fasterxml.jackson.databind.ext.Java7SupportImpl", + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"com.github.benmanes.caffeine.cache.BLCHeader$DrainStatusRef", + "fields":[{"name":"drainStatus"}] +}, +{ + "name":"com.github.benmanes.caffeine.cache.BaseMpscLinkedArrayQueueColdProducerFields", + "fields":[{"name":"producerLimit"}] +}, +{ + "name":"com.github.benmanes.caffeine.cache.BaseMpscLinkedArrayQueueConsumerFields", + "fields":[{"name":"consumerIndex"}] +}, +{ + "name":"com.github.benmanes.caffeine.cache.BaseMpscLinkedArrayQueueProducerFields", + "fields":[{"name":"producerIndex"}] +}, +{ + "name":"com.github.benmanes.caffeine.cache.CacheLoader", + "methods":[{"name":"loadAll","parameterTypes":["java.lang.Iterable"] }] +}, +{ + "name":"com.github.benmanes.caffeine.cache.FD", + "fields":[{"name":"value"}] +}, +{ + "name":"com.github.benmanes.caffeine.cache.FDMS", + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"com.github.benmanes.caffeine.cache.FS", + "fields":[{"name":"key"}, {"name":"value"}], + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"com.github.benmanes.caffeine.cache.PS", + "fields":[{"name":"key"}, {"name":"value"}] +}, +{ + "name":"com.github.benmanes.caffeine.cache.PSA", + "fields":[{"name":"accessTime"}], + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"com.github.benmanes.caffeine.cache.PW", + "fields":[{"name":"value"}], + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"com.github.benmanes.caffeine.cache.SI", + "methods":[{"name":"","parameterTypes":["com.github.benmanes.caffeine.cache.Caffeine","com.github.benmanes.caffeine.cache.CacheLoader","boolean"] }] +}, +{ + "name":"com.github.benmanes.caffeine.cache.SSA", + "methods":[{"name":"","parameterTypes":["com.github.benmanes.caffeine.cache.Caffeine","com.github.benmanes.caffeine.cache.CacheLoader","boolean"] }] +}, +{ + "name":"com.github.benmanes.caffeine.cache.SSLA", + "methods":[{"name":"","parameterTypes":["com.github.benmanes.caffeine.cache.Caffeine","com.github.benmanes.caffeine.cache.CacheLoader","boolean"] }] +}, +{ + "name":"com.github.benmanes.caffeine.cache.StripedBuffer", + "fields":[{"name":"tableBusy"}] +}, +{ + "name":"com.github.benmanes.caffeine.cache.WILSMS", + "methods":[{"name":"","parameterTypes":["com.github.benmanes.caffeine.cache.Caffeine","com.github.benmanes.caffeine.cache.CacheLoader","boolean"] }] +}, +{ + "name":"com.github.benmanes.caffeine.cache.WSL", + "methods":[{"name":"","parameterTypes":["com.github.benmanes.caffeine.cache.Caffeine","com.github.benmanes.caffeine.cache.CacheLoader","boolean"] }] +}, +{ + "name":"com.sun.crypto.provider.AESCipher$General", + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"com.sun.crypto.provider.ARCFOURCipher", + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"com.sun.crypto.provider.ChaCha20Cipher$ChaCha20Poly1305", + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"com.sun.crypto.provider.DESCipher", + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"com.sun.crypto.provider.DESedeCipher", + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"com.sun.crypto.provider.DHParameters", + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"com.sun.crypto.provider.GaloisCounterMode$AESGCM", + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"com.sun.crypto.provider.HmacCore$HmacSHA256", + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"com.sun.crypto.provider.HmacCore$HmacSHA384", + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"com.sun.crypto.provider.TlsMasterSecretGenerator", + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"java.lang.String" +}, +{ + "name":"java.lang.Thread", + "fields":[{"name":"threadLocalRandomProbe"}] +}, +{ + "name":"java.security.AlgorithmParametersSpi" +}, +{ + "name":"java.security.KeyStoreSpi" +}, +{ + "name":"java.security.SecureRandomParameters" +}, +{ + "name":"java.security.interfaces.ECPrivateKey" +}, +{ + "name":"java.security.interfaces.ECPublicKey" +}, +{ + "name":"java.security.interfaces.RSAPrivateKey" +}, +{ + "name":"java.security.interfaces.RSAPublicKey" +}, +{ + "name":"java.util.Date" +}, +{ + "name":"java.util.concurrent.ForkJoinTask", + "fields":[{"name":"aux"}, {"name":"status"}] +}, +{ + "name":"java.util.concurrent.atomic.AtomicBoolean", + "fields":[{"name":"value"}] +}, +{ + "name":"java.util.concurrent.atomic.AtomicMarkableReference", + "fields":[{"name":"pair"}] +}, +{ + "name":"java.util.concurrent.atomic.AtomicReference", + "fields":[{"name":"value"}] +}, +{ + "name":"java.util.concurrent.atomic.Striped64", + "fields":[{"name":"base"}, {"name":"cellsBusy"}] +}, +{ + "name":"javax.security.auth.x500.X500Principal", + "fields":[{"name":"thisX500Name"}], + "methods":[{"name":"","parameterTypes":["sun.security.x509.X500Name"] }] +}, +{ + "name":"jdk.internal.misc.Unsafe" +}, +{ + "name":"kotlin.Unit" +}, +{ + "name":"org.apache.commons.logging.LogFactory" +}, +{ + "name":"org.apache.commons.logging.impl.Log4JLogger", + "methods":[{"name":"","parameterTypes":["java.lang.String"] }, {"name":"setLogFactory","parameterTypes":["org.apache.commons.logging.LogFactory"] }] +}, +{ + "name":"org.apache.commons.logging.impl.LogFactoryImpl", + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"org.apache.commons.logging.impl.WeakHashtable", + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"org.apache.hadoop.fs.FSDataInputStream" +}, +{ + "name":"org.apache.http.client.config.RequestConfig$Builder", + "methods":[{"name":"setNormalizeUri","parameterTypes":["boolean"] }] +}, +{ + "name":"org.apache.iceberg.GenericDataFile", + "methods":[{"name":"","parameterTypes":["org.apache.iceberg.types.Types$StructType"] }] +}, +{ + "name":"org.apache.iceberg.GenericManifestEntry", + "methods":[{"name":"","parameterTypes":["org.apache.iceberg.types.Types$StructType"] }] +}, +{ + "name":"org.apache.iceberg.GenericManifestFile", + "methods":[{"name":"","parameterTypes":["org.apache.avro.Schema"] }] +}, +{ + "name":"org.apache.iceberg.GenericPartitionFieldSummary", + "methods":[{"name":"","parameterTypes":["org.apache.avro.Schema"] }] +}, +{ + "name":"org.apache.iceberg.PartitionData", + "methods":[{"name":"","parameterTypes":["org.apache.iceberg.types.Types$StructType"] }] +}, +{ + "name":"org.apache.iceberg.aws.ApacheHttpClientConfigurations", + "methods":[{"name":"create","parameterTypes":["java.util.Map"] }] +}, +{ + "name":"org.apache.iceberg.aws.AwsClientFactories$DefaultAwsClientFactory", + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"org.apache.iceberg.aws.s3.S3FileIO", + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"org.apache.iceberg.hadoop.HadoopMetricsContext", + "methods":[{"name":"","parameterTypes":["java.lang.String"] }] +}, +{ + "name":"org.apache.iceberg.relocated.com.google.common.util.concurrent.AbstractFuture", + "fields":[{"name":"listeners"}, {"name":"value"}, {"name":"waiters"}] +}, +{ + "name":"org.apache.iceberg.relocated.com.google.common.util.concurrent.AbstractFuture$Waiter", + "fields":[{"name":"next"}, {"name":"thread"}] +}, +{ + "name":"org.apache.iceberg.util.Pair$1" +}, +{ + "name":"org.apache.log4j.Level", + "fields":[{"name":"TRACE"}] +}, +{ + "name":"org.apache.log4j.Priority" +}, +{ + "name":"org.brotli.dec.BrotliInputStream" +}, +{ + "name":"scala.util.Properties" +}, +{ + "name":"sun.misc.Unsafe", + "allDeclaredFields":true +}, +{ + "name":"sun.security.pkcs12.PKCS12KeyStore", + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"sun.security.pkcs12.PKCS12KeyStore$DualFormatPKCS12", + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"sun.security.provider.DSA$SHA224withDSA", + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"sun.security.provider.DSA$SHA256withDSA", + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"sun.security.provider.JavaKeyStore$DualFormatJKS", + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"sun.security.provider.JavaKeyStore$JKS", + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"sun.security.provider.MD5", + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"sun.security.provider.NativePRNG", + "methods":[{"name":"","parameterTypes":[] }, {"name":"","parameterTypes":["java.security.SecureRandomParameters"] }] +}, +{ + "name":"sun.security.provider.SHA", + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"sun.security.provider.SHA2$SHA224", + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"sun.security.provider.SHA2$SHA256", + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"sun.security.provider.SHA5$SHA384", + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"sun.security.provider.SHA5$SHA512", + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"sun.security.provider.X509Factory", + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"sun.security.provider.certpath.PKIXCertPathValidator", + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"sun.security.rsa.PSSParameters", + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"sun.security.rsa.RSAKeyFactory$Legacy", + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"sun.security.rsa.RSAPSSSignature", + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"sun.security.rsa.RSASignature$SHA224withRSA", + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"sun.security.rsa.RSASignature$SHA256withRSA", + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"sun.security.ssl.KeyManagerFactoryImpl$SunX509", + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"sun.security.ssl.SSLContextImpl$DefaultSSLContext", + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"sun.security.ssl.SSLContextImpl$TLSContext", + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"sun.security.ssl.TrustManagerFactoryImpl$PKIXFactory", + "methods":[{"name":"","parameterTypes":[] }] +}, +{ + "name":"sun.security.util.ObjectIdentifier" +}, +{ + "name":"sun.security.x509.AuthorityInfoAccessExtension", + "methods":[{"name":"","parameterTypes":["java.lang.Boolean","java.lang.Object"] }] +}, +{ + "name":"sun.security.x509.AuthorityKeyIdentifierExtension", + "methods":[{"name":"","parameterTypes":["java.lang.Boolean","java.lang.Object"] }] +}, +{ + "name":"sun.security.x509.BasicConstraintsExtension", + "methods":[{"name":"","parameterTypes":["java.lang.Boolean","java.lang.Object"] }] +}, +{ + "name":"sun.security.x509.CRLDistributionPointsExtension", + "methods":[{"name":"","parameterTypes":["java.lang.Boolean","java.lang.Object"] }] +}, +{ + "name":"sun.security.x509.CertificateExtensions" +}, +{ + "name":"sun.security.x509.CertificatePoliciesExtension", + "methods":[{"name":"","parameterTypes":["java.lang.Boolean","java.lang.Object"] }] +}, +{ + "name":"sun.security.x509.ExtendedKeyUsageExtension", + "methods":[{"name":"","parameterTypes":["java.lang.Boolean","java.lang.Object"] }] +}, +{ + "name":"sun.security.x509.IssuerAlternativeNameExtension", + "methods":[{"name":"","parameterTypes":["java.lang.Boolean","java.lang.Object"] }] +}, +{ + "name":"sun.security.x509.KeyUsageExtension", + "methods":[{"name":"","parameterTypes":["java.lang.Boolean","java.lang.Object"] }] +}, +{ + "name":"sun.security.x509.NetscapeCertTypeExtension", + "methods":[{"name":"","parameterTypes":["java.lang.Boolean","java.lang.Object"] }] +}, +{ + "name":"sun.security.x509.PrivateKeyUsageExtension", + "methods":[{"name":"","parameterTypes":["java.lang.Boolean","java.lang.Object"] }] +}, +{ + "name":"sun.security.x509.SubjectAlternativeNameExtension", + "methods":[{"name":"","parameterTypes":["java.lang.Boolean","java.lang.Object"] }] +}, +{ + "name":"sun.security.x509.SubjectKeyIdentifierExtension", + "methods":[{"name":"","parameterTypes":["java.lang.Boolean","java.lang.Object"] }] +} +] diff --git a/jiceberg_lib/app/src/main/resources/META-INF/native-image-config/resource-config.json b/jiceberg_lib/app/src/main/resources/META-INF/native-image-config/resource-config.json new file mode 100644 index 0000000..040692b --- /dev/null +++ b/jiceberg_lib/app/src/main/resources/META-INF/native-image-config/resource-config.json @@ -0,0 +1,57 @@ +{ + "resources":{ + "includes":[{ + "pattern":"\\QMETA-INF/maven/org.xerial.snappy/snappy-java/pom.properties\\E" + }, { + "pattern":"\\QMETA-INF/services/java.lang.System$LoggerFinder\\E" + }, { + "pattern":"\\QMETA-INF/services/java.net.spi.InetAddressResolverProvider\\E" + }, { + "pattern":"\\QMETA-INF/services/java.net.spi.URLStreamHandlerProvider\\E" + }, { + "pattern":"\\QMETA-INF/services/org.apache.avro.Conversion\\E" + }, { + "pattern":"\\QMETA-INF/services/org.apache.avro.LogicalTypes$LogicalTypeFactory\\E" + }, { + "pattern":"\\QMETA-INF/services/org.apache.commons.logging.LogFactory\\E" + }, { + "pattern":"\\QMETA-INF/services/org.slf4j.spi.SLF4JServiceProvider\\E" + }, { + "pattern":"\\Qcommons-logging.properties\\E" + }, { + "pattern":"\\Qcore-default.xml\\E" + }, { + "pattern":"\\Qcore-site.xml\\E" + }, { + "pattern":"\\Qhadoop-site.xml\\E" + }, { + "pattern":"\\Qiceberg-build.properties\\E" + }, { + "pattern":"\\Qmozilla/public-suffix-list.txt\\E" + }, { + "pattern":"\\Qorg-xerial-snappy.properties\\E" + }, { + "pattern":"\\Qorg/apache/hc/client5/http/psl/org/publicsuffix/list/effective_tld_names.dat\\E" + }, { + "pattern":"\\Qorg/apache/hc/client5/version.properties\\E" + }, { + "pattern":"\\Qorg/xerial/snappy/VERSION\\E" + }, { + "pattern":"\\Qorg/xerial/snappy/native/Mac/aarch64/libsnappyjava.dylib\\E" + }, { + "pattern":"\\Qsoftware/amazon/awssdk/global/handlers/execution.interceptors\\E" + }, { + "pattern":"\\Qsoftware/amazon/awssdk/global/partitions.json\\E" + }, { + "pattern":"\\Qsoftware/amazon/awssdk/services/s3/execution.interceptors\\E" + }, { + "pattern":"java.base:\\Qjdk/internal/icu/impl/data/icudt72b/nfc.nrm\\E" + }, { + "pattern":"java.base:\\Qjdk/internal/icu/impl/data/icudt72b/nfkc.nrm\\E" + }, { + "pattern":"java.base:\\Qjdk/internal/icu/impl/data/icudt72b/uprops.icu\\E" + }, { + "pattern":"java.base:\\Qsun/net/idn/uidna.spp\\E" + }]}, + "bundles":[] +} diff --git a/jiceberg_lib/app/src/main/resources/META-INF/native-image-config/serialization-config.json b/jiceberg_lib/app/src/main/resources/META-INF/native-image-config/serialization-config.json new file mode 100644 index 0000000..f3d7e06 --- /dev/null +++ b/jiceberg_lib/app/src/main/resources/META-INF/native-image-config/serialization-config.json @@ -0,0 +1,8 @@ +{ + "types":[ + ], + "lambdaCapturingTypes":[ + ], + "proxies":[ + ] +} diff --git a/jiceberg_lib/gradle/libs.versions.toml b/jiceberg_lib/gradle/libs.versions.toml new file mode 100644 index 0000000..334ec20 --- /dev/null +++ b/jiceberg_lib/gradle/libs.versions.toml @@ -0,0 +1,8 @@ +# This file was generated by the Gradle 'init' task. +# https://docs.gradle.org/current/userguide/platforms.html#sub::toml-dependencies-format + +[versions] +guava = "33.1.0-jre" + +[libraries] +guava = { module = "com.google.guava:guava", version.ref = "guava" } diff --git a/jiceberg_lib/gradle/wrapper/gradle-wrapper.jar b/jiceberg_lib/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 0000000..2c35211 Binary files /dev/null and b/jiceberg_lib/gradle/wrapper/gradle-wrapper.jar differ diff --git a/jiceberg_lib/gradle/wrapper/gradle-wrapper.properties b/jiceberg_lib/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 0000000..09523c0 --- /dev/null +++ b/jiceberg_lib/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,7 @@ +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-8.9-bin.zip +networkTimeout=10000 +validateDistributionUrl=true +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists diff --git a/jiceberg_lib/gradlew b/jiceberg_lib/gradlew new file mode 100755 index 0000000..f5feea6 --- /dev/null +++ b/jiceberg_lib/gradlew @@ -0,0 +1,252 @@ +#!/bin/sh + +# +# Copyright © 2015-2021 the original authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# + +############################################################################## +# +# Gradle start up script for POSIX generated by Gradle. +# +# Important for running: +# +# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is +# noncompliant, but you have some other compliant shell such as ksh or +# bash, then to run this script, type that shell name before the whole +# command line, like: +# +# ksh Gradle +# +# Busybox and similar reduced shells will NOT work, because this script +# requires all of these POSIX shell features: +# * functions; +# * expansions «$var», «${var}», «${var:-default}», «${var+SET}», +# «${var#prefix}», «${var%suffix}», and «$( cmd )»; +# * compound commands having a testable exit status, especially «case»; +# * various built-in commands including «command», «set», and «ulimit». +# +# Important for patching: +# +# (2) This script targets any POSIX shell, so it avoids extensions provided +# by Bash, Ksh, etc; in particular arrays are avoided. +# +# The "traditional" practice of packing multiple parameters into a +# space-separated string is a well documented source of bugs and security +# problems, so this is (mostly) avoided, by progressively accumulating +# options in "$@", and eventually passing that to Java. +# +# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS, +# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly; +# see the in-line comments for details. +# +# There are tweaks for specific operating systems such as AIX, CygWin, +# Darwin, MinGW, and NonStop. +# +# (3) This script is generated from the Groovy template +# https://github.com/gradle/gradle/blob/HEAD/platforms/jvm/plugins-application/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# within the Gradle project. +# +# You can find Gradle at https://github.com/gradle/gradle/. +# +############################################################################## + +# Attempt to set APP_HOME + +# Resolve links: $0 may be a link +app_path=$0 + +# Need this for daisy-chained symlinks. +while + APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path + [ -h "$app_path" ] +do + ls=$( ls -ld "$app_path" ) + link=${ls#*' -> '} + case $link in #( + /*) app_path=$link ;; #( + *) app_path=$APP_HOME$link ;; + esac +done + +# This is normally unused +# shellcheck disable=SC2034 +APP_BASE_NAME=${0##*/} +# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036) +APP_HOME=$( cd -P "${APP_HOME:-./}" > /dev/null && printf '%s +' "$PWD" ) || exit + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD=maximum + +warn () { + echo "$*" +} >&2 + +die () { + echo + echo "$*" + echo + exit 1 +} >&2 + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "$( uname )" in #( + CYGWIN* ) cygwin=true ;; #( + Darwin* ) darwin=true ;; #( + MSYS* | MINGW* ) msys=true ;; #( + NONSTOP* ) nonstop=true ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD=$JAVA_HOME/jre/sh/java + else + JAVACMD=$JAVA_HOME/bin/java + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD=java + if ! command -v java >/dev/null 2>&1 + then + die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +fi + +# Increase the maximum file descriptors if we can. +if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then + case $MAX_FD in #( + max*) + # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC2039,SC3045 + MAX_FD=$( ulimit -H -n ) || + warn "Could not query maximum file descriptor limit" + esac + case $MAX_FD in #( + '' | soft) :;; #( + *) + # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC2039,SC3045 + ulimit -n "$MAX_FD" || + warn "Could not set maximum file descriptor limit to $MAX_FD" + esac +fi + +# Collect all arguments for the java command, stacking in reverse order: +# * args from the command line +# * the main class name +# * -classpath +# * -D...appname settings +# * --module-path (only if needed) +# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables. + +# For Cygwin or MSYS, switch paths to Windows format before running java +if "$cygwin" || "$msys" ; then + APP_HOME=$( cygpath --path --mixed "$APP_HOME" ) + CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" ) + + JAVACMD=$( cygpath --unix "$JAVACMD" ) + + # Now convert the arguments - kludge to limit ourselves to /bin/sh + for arg do + if + case $arg in #( + -*) false ;; # don't mess with options #( + /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath + [ -e "$t" ] ;; #( + *) false ;; + esac + then + arg=$( cygpath --path --ignore --mixed "$arg" ) + fi + # Roll the args list around exactly as many times as the number of + # args, so each arg winds up back in the position where it started, but + # possibly modified. + # + # NB: a `for` loop captures its iteration list before it begins, so + # changing the positional parameters here affects neither the number of + # iterations, nor the values presented in `arg`. + shift # remove old arg + set -- "$@" "$arg" # push replacement arg + done +fi + + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Collect all arguments for the java command: +# * DEFAULT_JVM_OPTS, JAVA_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments, +# and any embedded shellness will be escaped. +# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be +# treated as '${Hostname}' itself on the command line. + +set -- \ + "-Dorg.gradle.appname=$APP_BASE_NAME" \ + -classpath "$CLASSPATH" \ + org.gradle.wrapper.GradleWrapperMain \ + "$@" + +# Stop when "xargs" is not available. +if ! command -v xargs >/dev/null 2>&1 +then + die "xargs is not available" +fi + +# Use "xargs" to parse quoted args. +# +# With -n1 it outputs one arg per line, with the quotes and backslashes removed. +# +# In Bash we could simply go: +# +# readarray ARGS < <( xargs -n1 <<<"$var" ) && +# set -- "${ARGS[@]}" "$@" +# +# but POSIX shell has neither arrays nor command substitution, so instead we +# post-process each arg (as a line of input to sed) to backslash-escape any +# character that might be a shell metacharacter, then use eval to reverse +# that process (while maintaining the separation between arguments), and wrap +# the whole thing up as a single "set" statement. +# +# This will of course break if any of these variables contains a newline or +# an unmatched quote. +# + +eval "set -- $( + printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" | + xargs -n1 | + sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' | + tr '\n' ' ' + )" '"$@"' + +exec "$JAVACMD" "$@" diff --git a/jiceberg_lib/gradlew.bat b/jiceberg_lib/gradlew.bat new file mode 100644 index 0000000..9d21a21 --- /dev/null +++ b/jiceberg_lib/gradlew.bat @@ -0,0 +1,94 @@ +@rem +@rem Copyright 2015 the original author or authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem https://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem +@rem SPDX-License-Identifier: Apache-2.0 +@rem + +@if "%DEBUG%"=="" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%"=="" set DIRNAME=. +@rem This is normally unused +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if %ERRORLEVEL% equ 0 goto execute + +echo. 1>&2 +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. 1>&2 +echo. 1>&2 +echo Please set the JAVA_HOME variable in your environment to match the 1>&2 +echo location of your Java installation. 1>&2 + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto execute + +echo. 1>&2 +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% 1>&2 +echo. 1>&2 +echo Please set the JAVA_HOME variable in your environment to match the 1>&2 +echo location of your Java installation. 1>&2 + +goto fail + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* + +:end +@rem End local scope for the variables with windows NT shell +if %ERRORLEVEL% equ 0 goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +set EXIT_CODE=%ERRORLEVEL% +if %EXIT_CODE% equ 0 set EXIT_CODE=1 +if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE% +exit /b %EXIT_CODE% + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/jiceberg_lib/scripts/copy_headers.sh b/jiceberg_lib/scripts/copy_headers.sh new file mode 100755 index 0000000..6870989 --- /dev/null +++ b/jiceberg_lib/scripts/copy_headers.sh @@ -0,0 +1,18 @@ +#!/bin/sh + +# Make sure we are in the correct folder +working_dir=$(pwd) +if [ "${working_dir%jiceberg_lib}" != "${working_dir}" ]; then + cd scripts +else + if [ "${working_dir%jiceberg_lib/scripts}" == "${working_dir}" ]; then + echo "Please run from jiceberg_lib/ folder" + exit + fi +fi + +echo "Copying header files..." + +rm -rf ../../src/include/jiceberg_generated +mkdir ../../src/include/jiceberg_generated +cp ../app/build/native/nativeCompile/*.h ../../src/include/jiceberg_generated/ diff --git a/jiceberg_lib/settings.gradle b/jiceberg_lib/settings.gradle new file mode 100644 index 0000000..e863d10 --- /dev/null +++ b/jiceberg_lib/settings.gradle @@ -0,0 +1,14 @@ +/* + * This file was generated by the Gradle 'init' task. + * + * The settings file is used to specify which projects to include in your build. + * For more detailed information on multi-project builds, please refer to https://docs.gradle.org/8.9/userguide/multi_project_builds.html in the Gradle documentation. + */ + +plugins { + // Apply the foojay-resolver plugin to allow automatic download of JDKs + id 'org.gradle.toolchains.foojay-resolver-convention' version '0.8.0' +} + +rootProject.name = 'jiceberg_lib' +include('app') diff --git a/src/catalog_api.cpp b/src/catalog_api.cpp new file mode 100644 index 0000000..f80d033 --- /dev/null +++ b/src/catalog_api.cpp @@ -0,0 +1,467 @@ +#include "catalog_api.hpp" +#include "catalog_utils.hpp" +#include "storage/ic_catalog.hpp" +#include "yyjson.hpp" + +#include +#include +#include +#include + +using namespace duckdb_yyjson; +namespace duckdb { + +//! We use a global here to store the path that is selected on the ICAPI::InitializeCurl call +static string SELECTED_CURL_CERT_PATH = ""; + +static size_t RequestWriteCallback(void *contents, size_t size, size_t nmemb, void *userp) { + ((std::string *)userp)->append((char *)contents, size * nmemb); + return size * nmemb; +} + +// we statically compile in libcurl, which means the cert file location of the build machine is the +// place curl will look. But not every distro has this file in the same location, so we search a +// number of common locations and use the first one we find. +static string certFileLocations[] = { + // Arch, Debian-based, Gentoo + "/etc/ssl/certs/ca-certificates.crt", + // RedHat 7 based + "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem", + // Redhat 6 based + "/etc/pki/tls/certs/ca-bundle.crt", + // OpenSUSE + "/etc/ssl/ca-bundle.pem", + // Alpine + "/etc/ssl/cert.pem" +}; + +struct YyjsonDocDeleter { + void operator()(yyjson_doc* doc) { + yyjson_doc_free(doc); + } + void operator()(yyjson_mut_doc* doc) { + yyjson_mut_doc_free(doc); + } +}; + +// Look through the the above locations and if one of the files exists, set that as the location curl should use. +static bool SelectCurlCertPath() { + for (string& caFile : certFileLocations) { + struct stat buf; + if (stat(caFile.c_str(), &buf) == 0) { + SELECTED_CURL_CERT_PATH = caFile; + } + } + return false; +} + +static bool SetCurlCAFileInfo(CURL* curl) { + if (!SELECTED_CURL_CERT_PATH.empty()) { + curl_easy_setopt(curl, CURLOPT_CAINFO, SELECTED_CURL_CERT_PATH.c_str()); + return true; + } + return false; +} + +// Note: every curl object we use should set this, because without it some linux distro's may not find the CA certificate. +static void InitializeCurlObject(CURL * curl, const string &token) { + if (!token.empty()) { + curl_easy_setopt(curl, CURLOPT_XOAUTH2_BEARER, token.c_str()); + curl_easy_setopt(curl, CURLOPT_HTTPAUTH, CURLAUTH_BEARER); + } + SetCurlCAFileInfo(curl); +} + +template +static TYPE TemplatedTryGetYYJson(yyjson_val *obj, const string &field, TYPE default_val, + bool fail_on_missing = true) { + auto val = yyjson_obj_get(obj, field.c_str()); + if (val && yyjson_get_type(val) == TYPE_NUM) { + return get_function(val); + } else if (!fail_on_missing) { + return default_val; + } + throw IOException("Invalid field found while parsing field: " + field); +} + +static uint64_t TryGetNumFromObject(yyjson_val *obj, const string &field, bool fail_on_missing = true, + uint64_t default_val = 0) { + return TemplatedTryGetYYJson(obj, field, default_val, + fail_on_missing); +} +static bool TryGetBoolFromObject(yyjson_val *obj, const string &field, bool fail_on_missing = false, + bool default_val = false) { + return TemplatedTryGetYYJson(obj, field, default_val, + fail_on_missing); +} +static string TryGetStrFromObject(yyjson_val *obj, const string &field, bool fail_on_missing = true, + const char *default_val = "") { + return TemplatedTryGetYYJson(obj, field, default_val, + fail_on_missing); +} + +static string DeleteRequest(const string &url, const string &token = "", curl_slist *extra_headers = NULL) { + CURL *curl; + CURLcode res; + string readBuffer; + + curl = curl_easy_init(); + if (curl) { + curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); + curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE"); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, RequestWriteCallback); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, &readBuffer); + + if(extra_headers) { + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, extra_headers); + } + + InitializeCurlObject(curl, token); + res = curl_easy_perform(curl); + curl_easy_cleanup(curl); + + if (res != CURLcode::CURLE_OK) { + string error = curl_easy_strerror(res); + throw IOException("Curl DELETE Request to '%s' failed with error: '%s'", url, error); + } + + return readBuffer; + } + throw InternalException("Failed to initialize curl"); +} + +static string GetRequest(const string &url, const string &token = "", curl_slist *extra_headers = NULL) { + CURL *curl; + CURLcode res; + string readBuffer; + + curl = curl_easy_init(); + if (curl) { + curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, RequestWriteCallback); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, &readBuffer); + + if(extra_headers) { + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, extra_headers); + } + + InitializeCurlObject(curl, token); + res = curl_easy_perform(curl); + curl_easy_cleanup(curl); + + if (res != CURLcode::CURLE_OK) { + string error = curl_easy_strerror(res); + throw IOException("Curl Request to '%s' failed with error: '%s'", url, error); + } + + return readBuffer; + } + throw InternalException("Failed to initialize curl"); +} + +static string PostRequest( + const string &url, + const string &post_data, + const string &content_type = "x-www-form-urlencoded", + const string &token = "", + curl_slist *extra_headers = NULL) { + string readBuffer; + CURL *curl = curl_easy_init(); + if (!curl) { + throw InternalException("Failed to initialize curl"); + } + + curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); + curl_easy_setopt(curl, CURLOPT_POST, 1L); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, post_data.c_str()); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, RequestWriteCallback); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, &readBuffer); + + // Create default headers for content type + struct curl_slist *headers = NULL; + const string content_type_str = "Content-Type: application/" + content_type; + headers = curl_slist_append(headers, content_type_str.c_str()); + + // Append any extra headers + if (extra_headers) { + struct curl_slist *temp = extra_headers; + while (temp) { + headers = curl_slist_append(headers, temp->data); + temp = temp->next; + } + } + + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); + InitializeCurlObject(curl, token); + + // Perform the request + CURLcode res = curl_easy_perform(curl); + + // Clean up + curl_slist_free_all(headers); + curl_easy_cleanup(curl); + + if (res != CURLcode::CURLE_OK) { + string error = curl_easy_strerror(res); + throw IOException("Curl Request to '%s' failed with error: '%s'", url, error); + } + return readBuffer; +} + +static yyjson_doc *api_result_to_doc(const string &api_result) { + auto *doc = yyjson_read(api_result.c_str(), api_result.size(), 0); + auto *root = yyjson_doc_get_root(doc); + auto *error = yyjson_obj_get(root, "error"); + if (error != NULL) { + string err_msg = TryGetStrFromObject(error, "message"); + throw std::runtime_error(err_msg); + } + return doc; +} + +static string GetTableMetadata(const string &iceberg_catalog, const string &schema, const string &table, ICCredentials credentials) { + struct curl_slist *extra_headers = NULL; + extra_headers = curl_slist_append(extra_headers, "X-Iceberg-Access-Delegation: vended-credentials"); + string api_result = GetRequest( + credentials.endpoint + "/v1/" + iceberg_catalog + "/namespaces/" + schema + "/tables/" + table, + credentials.token, + extra_headers); + curl_slist_free_all(extra_headers); + return api_result; +} + +void ICAPI::InitializeCurl() { + SelectCurlCertPath(); +} + +vector ICAPI::GetCatalogs(const string &catalog, ICCredentials credentials) { + throw NotImplementedException("ICAPI::GetCatalogs"); +} + +static ICAPIColumnDefinition ParseColumnDefinition(yyjson_val *column_def) { + ICAPIColumnDefinition result; + result.name = TryGetStrFromObject(column_def, "name"); + result.type_text = TryGetStrFromObject(column_def, "type"); + result.precision = (result.type_text == "decimal") ? TryGetNumFromObject(column_def, "type_precision") : -1; + result.scale = (result.type_text == "decimal") ? TryGetNumFromObject(column_def, "type_scale") : -1; + result.position = TryGetNumFromObject(column_def, "id") - 1; + return result; +} + +ICAPITableCredentials ICAPI::GetTableCredentials(const string &iceberg_catalog, const string &schema, const string &table, ICCredentials credentials) { + ICAPITableCredentials result; + string api_result = GetTableMetadata(iceberg_catalog, schema, table, credentials); + std::unique_ptr doc(api_result_to_doc(api_result)); + auto *root = yyjson_doc_get_root(doc.get()); + auto *aws_temp_credentials = yyjson_obj_get(root, "config"); + if (aws_temp_credentials) { + result.key_id = TryGetStrFromObject(aws_temp_credentials, "s3.access-key-id"); + result.secret = TryGetStrFromObject(aws_temp_credentials, "s3.secret-access-key"); + result.session_token = TryGetStrFromObject(aws_temp_credentials, "s3.session-token"); + return result; + } + + throw std::runtime_error("No AWS credentials found for table"); +} + +string ICAPI::GetToken(string id, string secret, string endpoint) { + string post_data = "grant_type=client_credentials&client_id=" + id + "&client_secret=" + secret + "&scope=PRINCIPAL_ROLE:ALL"; + string api_result = PostRequest(endpoint + "/v1/oauth/tokens", post_data); + std::unique_ptr doc(api_result_to_doc(api_result)); + auto *root = yyjson_doc_get_root(doc.get()); + return TryGetStrFromObject(root, "access_token"); +} + +static void populateTableMetadata(ICAPITable &table, yyjson_val *metadata_root) { + table.metadata_location = TryGetStrFromObject(metadata_root, "metadata-location"); + auto *metadata = yyjson_obj_get(metadata_root, "metadata"); + //table_result.table_id = TryGetStrFromObject(metadata, "table-uuid"); + + uint64_t current_schema_id = TryGetNumFromObject(metadata, "current-schema-id"); + auto *schemas = yyjson_obj_get(metadata, "schemas"); + yyjson_val *schema; + size_t schema_idx, schema_max; + bool found = false; + yyjson_arr_foreach(schemas, schema_idx, schema_max, schema) { + uint64_t schema_id = TryGetNumFromObject(schema, "schema-id"); + if (schema_id == current_schema_id) { + found = true; + auto *columns = yyjson_obj_get(schema, "fields"); + yyjson_val *col; + size_t col_idx, col_max; + yyjson_arr_foreach(columns, col_idx, col_max, col) { + auto column_definition = ParseColumnDefinition(col); + table.columns.push_back(column_definition); + } + } + } + + if (!found) { + throw InternalException("Current schema not found"); + } +} + +static ICAPITable createTable(const string &catalog, const string &schema, const string &table_name) { + ICAPITable table_result; + table_result.catalog_name = catalog; + table_result.schema_name = schema; + table_result.name = table_name; + table_result.data_source_format = "ICEBERG"; + table_result.table_id = "uuid-" + schema + "-" + "table"; + std::replace(table_result.table_id.begin(), table_result.table_id.end(), '_', '-'); + return table_result; +} + +ICAPITable ICAPI::GetTable( + const string &catalog, const string &iceberg_catalog, const string &schema, const string &table_name, std::optional credentials) { + + ICAPITable table_result = createTable(catalog, schema, table_name); + if (credentials) { + string result = GetTableMetadata(iceberg_catalog, schema, table_result.name, *credentials); + std::unique_ptr doc(api_result_to_doc(result)); + auto *metadata_root = yyjson_doc_get_root(doc.get()); + populateTableMetadata(table_result, metadata_root); + } else { + // Skip fetching metadata, we'll do it later when we access the table + ICAPIColumnDefinition col; + col.name = "__"; + col.type_text = "int"; + col.precision = -1; + col.scale = -1; + col.position = 0; + table_result.columns.push_back(col); + } + + return table_result; +} + +// TODO: handle out-of-order columns using position property +vector ICAPI::GetTables(const string &catalog, const string &iceberg_catalog, const string &schema, ICCredentials credentials) { + vector result; + string api_result = GetRequest(credentials.endpoint + "/v1/" + iceberg_catalog + "/namespaces/" + schema + "/tables", credentials.token); + std::unique_ptr doc(api_result_to_doc(api_result)); + auto *root = yyjson_doc_get_root(doc.get()); + auto *tables = yyjson_obj_get(root, "identifiers"); + size_t idx, max; + yyjson_val *table; + yyjson_arr_foreach(tables, idx, max, table) { + auto table_result = GetTable(catalog, iceberg_catalog, schema, TryGetStrFromObject(table, "name"), std::nullopt); + result.push_back(table_result); + } + + return result; +} + +vector ICAPI::GetSchemas(const string &catalog, const string &iceberg_catalog, ICCredentials credentials) { + vector result; + string api_result = + GetRequest(credentials.endpoint + "/v1/" + iceberg_catalog + "/namespaces", credentials.token); + std::unique_ptr doc(api_result_to_doc(api_result)); + auto *root = yyjson_doc_get_root(doc.get()); + auto *schemas = yyjson_obj_get(root, "namespaces"); + size_t idx, max; + yyjson_val *schema; + yyjson_arr_foreach(schemas, idx, max, schema) { + ICAPISchema schema_result; + schema_result.catalog_name = catalog; + schema_result.schema_name = yyjson_get_str(yyjson_arr_get(schema, 0)); + schema_result.iceberg_catalog = iceberg_catalog; + result.push_back(schema_result); + } + + return result; +} + +ICAPISchema ICAPI::CreateSchema(const string &catalog, const string &iceberg_catalog, const string &schema, ICCredentials credentials) { + string post_data = "{\"namespace\":[\"" + schema + "\"]}"; + string api_result = PostRequest( + credentials.endpoint + "/v1/" + iceberg_catalog + "/namespaces", post_data, "json", credentials.token); + api_result_to_doc(api_result); // if the method returns, request was successful + + ICAPISchema schema_result; + schema_result.catalog_name = catalog; + schema_result.schema_name = schema; + schema_result.iceberg_catalog = iceberg_catalog; + return schema_result; +} + +void ICAPI::DropSchema(const string &iceberg_catalog, const string &schema, ICCredentials credentials) { + string api_result = DeleteRequest( + credentials.endpoint + "/v1/" + iceberg_catalog + "/namespaces/" + schema, credentials.token); + api_result_to_doc(api_result); // if the method returns, request was successful +} + +void ICAPI::DropTable(const string &catalog, const string &iceberg_catalog, const string &schema, string &table_name, ICCredentials credentials) { + string api_result = DeleteRequest( + credentials.endpoint + "/v1/" + iceberg_catalog + "/namespaces/" + schema + "/tables/" + table_name + "?purgeRequested=true", + credentials.token); + api_result_to_doc(api_result); // if the method returns, request was successful +} + +static std::string json_to_string(yyjson_mut_doc *doc) { + size_t len = 0; + yyjson_write_flag flags = YYJSON_WRITE_NOFLAG; + char *json_chars = yyjson_mut_write(doc, flags, &len); + std::string json_str(json_chars, len); + free(json_chars); + return json_str; +} + +ICAPITable ICAPI::CreateTable(const string &catalog, const string &iceberg_catalog, const string &schema, ICCredentials &credentials, CreateTableInfo *table_info) { + std::unique_ptr dd(yyjson_mut_doc_new(NULL)); + yyjson_mut_doc *doc = dd.get(); + + yyjson_mut_val *rr = yyjson_mut_obj(doc); + yyjson_mut_doc_set_root(doc, rr); + yyjson_mut_obj_add_str(doc, rr, "name", table_info->table.c_str()); + + yyjson_mut_val *sch = yyjson_mut_obj(doc); + yyjson_mut_obj_add_val(doc, rr, "schema", sch); + yyjson_mut_obj_add_str(doc, sch, "type", "struct"); + + yyjson_mut_val *fields = yyjson_mut_arr(doc); + yyjson_mut_obj_add_val(doc, sch, "fields", fields); + + std::vector column_names; + std::vector column_types; + size_t num_columns = table_info->columns.LogicalColumnCount(); + column_names.reserve(num_columns); + column_types.reserve(num_columns); + for (auto &col : table_info->columns.Logical()) { + // Store column name and type in vectors + column_names.push_back(col.GetName()); + column_types.push_back(ICUtils::LogicalToIcebergType(col.GetType())); + // Add column object to JSON + yyjson_mut_val *col_obj = yyjson_mut_obj(doc); + yyjson_mut_obj_add_int(doc, col_obj, "id", col.Oid()); + yyjson_mut_obj_add_bool(doc, col_obj, "required", false); + yyjson_mut_obj_add_str(doc, col_obj, "name", column_names.back().c_str()); + yyjson_mut_obj_add_str(doc, col_obj, "type", column_types.back().c_str()); + yyjson_mut_arr_add_val(fields, col_obj); + } + + yyjson_mut_val *props = yyjson_mut_obj(doc); + yyjson_mut_obj_add_val(doc, rr, "properties", props); + yyjson_mut_obj_add_str(doc, props, "write.parquet.compression-codec", "snappy"); + yyjson_mut_obj_add_str(doc, props, "format-version", "2"); // Important for newer Iceberg features + yyjson_mut_obj_add_str(doc, props, "write.metadata.compression-codec", "none"); // Ensure metadata writes work + yyjson_mut_obj_add_str(doc, props, "write.metadata.metrics.default", "full"); // Complete metadata + yyjson_mut_obj_add_str(doc, props, "write.metadata.delete-after-commit.enabled", "true"); + yyjson_mut_obj_add_str(doc, props, "write.metadata.previous-versions-max", "1"); + + string post_data = json_to_string(doc); + + struct curl_slist *extra_headers = NULL; + extra_headers = curl_slist_append(extra_headers, "X-Iceberg-Access-Delegation: vended-credentials"); + string api_result = PostRequest( + credentials.endpoint + "/v1/" + iceberg_catalog + "/namespaces/" + schema + "/tables", post_data, "json", credentials.token, extra_headers); + curl_slist_free_all(extra_headers); + + std::unique_ptr dd2(api_result_to_doc(api_result)); + auto *root = yyjson_doc_get_root(dd2.get()); + ICAPITable table_result = createTable(catalog, schema, table_info->table); + populateTableMetadata(table_result, root); + return table_result; +} + +} // namespace duckdb diff --git a/src/catalog_utils.cpp b/src/catalog_utils.cpp new file mode 100644 index 0000000..906bf92 --- /dev/null +++ b/src/catalog_utils.cpp @@ -0,0 +1,234 @@ +#include "catalog_utils.hpp" +#include "duckdb/common/operator/cast_operators.hpp" +#include "storage/ic_schema_entry.hpp" +#include "storage/ic_transaction.hpp" + +#include + +namespace duckdb { + +string ICUtils::LogicalToIcebergType(const LogicalType &input) { + switch (input.id()) { + case LogicalType::TINYINT: + case LogicalType::UTINYINT: + return "tinyint"; + case LogicalType::SMALLINT: + case LogicalType::USMALLINT: + return "smallint"; + case LogicalType::INTEGER: + case LogicalType::UINTEGER: + return "int"; + case LogicalType::BIGINT: + case LogicalType::UBIGINT: + return "long"; + case LogicalType::VARCHAR: + return "string"; + case LogicalType::DOUBLE: + return "double"; + case LogicalType::FLOAT: + return "float"; + case LogicalType::BOOLEAN: + return "boolean"; + case LogicalType::TIMESTAMP: + return "timestamp"; + case LogicalType::TIMESTAMP_TZ: + return "timestamptz"; + case LogicalType::BLOB: + return "binary"; + case LogicalType::DATE: + return "date"; + case LogicalTypeId::DECIMAL: + uint8_t precision = DecimalType::GetWidth(input); + uint8_t scale = DecimalType::GetScale(input); + return "decimal(" + std::to_string(precision) + ", " + std::to_string(scale) + ")"; + // case LogicalTypeId::ARRAY: + // case LogicalTypeId::STRUCT: + // case LogicalTypeId::MAP: + // default: + } + + throw std::runtime_error("Unsupported type: " + input.ToString()); +} + +string ICUtils::TypeToString(const LogicalType &input) { + switch (input.id()) { + case LogicalType::VARCHAR: + return "TEXT"; + case LogicalType::UTINYINT: + return "TINYINT UNSIGNED"; + case LogicalType::USMALLINT: + return "SMALLINT UNSIGNED"; + case LogicalType::UINTEGER: + return "INTEGER UNSIGNED"; + case LogicalType::UBIGINT: + return "BIGINT UNSIGNED"; + case LogicalType::TIMESTAMP: + return "DATETIME"; + case LogicalType::TIMESTAMP_TZ: + return "TIMESTAMP"; + default: + return input.ToString(); + } +} + +LogicalType ICUtils::TypeToLogicalType(const string &type_text) { + if (type_text == "tinyint") { + return LogicalType::TINYINT; + } else if (type_text == "smallint") { + return LogicalType::SMALLINT; + } else if (type_text == "bigint") { + return LogicalType::BIGINT; + } else if (type_text == "int") { + return LogicalType::INTEGER; + } else if (type_text == "long") { + return LogicalType::BIGINT; + } else if (type_text == "string") { + return LogicalType::VARCHAR; + } else if (type_text == "double") { + return LogicalType::DOUBLE; + } else if (type_text == "float") { + return LogicalType::FLOAT; + } else if (type_text == "boolean") { + return LogicalType::BOOLEAN; + } else if (type_text == "timestamp") { + return LogicalType::TIMESTAMP; + } else if (type_text == "timestamptz") { + return LogicalType::TIMESTAMP_TZ; + } else if (type_text == "binary") { + return LogicalType::BLOB; + } else if (type_text == "date") { + return LogicalType::DATE; + } else if (type_text.find("decimal(") == 0) { + size_t spec_end = type_text.find(')'); + if (spec_end != string::npos) { + size_t sep = type_text.find(','); + auto prec_str = type_text.substr(8, sep - 8); + auto scale_str = type_text.substr(sep + 1, spec_end - sep - 1); + uint8_t prec = Cast::Operation(prec_str); + uint8_t scale = Cast::Operation(scale_str); + return LogicalType::DECIMAL(prec, scale); + } + } else if (type_text.find("array<") == 0) { + size_t type_end = type_text.rfind('>'); // find last, to deal with nested + if (type_end != string::npos) { + auto child_type_str = type_text.substr(6, type_end - 6); + auto child_type = ICUtils::TypeToLogicalType(child_type_str); + return LogicalType::LIST(child_type); + } + } else if (type_text.find("map<") == 0) { + size_t type_end = type_text.rfind('>'); // find last, to deal with nested + if (type_end != string::npos) { + // TODO: Factor this and struct parsing into an iterator over ',' separated values + vector key_val; + size_t cur = 4; + auto nested_opens = 0; + for (;;) { + size_t next_sep = cur; + // find the location of the next ',' ignoring nested commas + while (type_text[next_sep] != ',' || nested_opens > 0) { + if (type_text[next_sep] == '<') { + nested_opens++; + } else if (type_text[next_sep] == '>') { + nested_opens--; + } + next_sep++; + if (next_sep == type_end) { + break; + } + } + auto child_str = type_text.substr(cur, next_sep - cur); + auto child_type = ICUtils::TypeToLogicalType(child_str); + key_val.push_back(child_type); + if (next_sep == type_end) { + break; + } + cur = next_sep + 1; + } + if (key_val.size() != 2) { + throw NotImplementedException("Invalid map specification with %i types", key_val.size()); + } + return LogicalType::MAP(key_val[0], key_val[1]); + } + } else if (type_text.find("struct<") == 0) { + size_t type_end = type_text.rfind('>'); // find last, to deal with nested + if (type_end != string::npos) { + child_list_t children; + size_t cur = 7; + auto nested_opens = 0; + for (;;) { + size_t next_sep = cur; + // find the location of the next ',' ignoring nested commas + while (type_text[next_sep] != ',' || nested_opens > 0) { + if (type_text[next_sep] == '<') { + nested_opens++; + } else if (type_text[next_sep] == '>') { + nested_opens--; + } + next_sep++; + if (next_sep == type_end) { + break; + } + } + auto child_str = type_text.substr(cur, next_sep - cur); + size_t type_sep = child_str.find(':'); + if (type_sep == string::npos) { + throw NotImplementedException("Invalid struct child type specifier: %s", child_str); + } + auto child_name = child_str.substr(0, type_sep); + auto child_type = ICUtils::TypeToLogicalType(child_str.substr(type_sep + 1, string::npos)); + children.push_back({child_name, child_type}); + if (next_sep == type_end) { + break; + } + cur = next_sep + 1; + } + return LogicalType::STRUCT(children); + } + } + + throw NotImplementedException("Tried to fallback to unknown type for '%s'", type_text); + // fallback for unknown types + return LogicalType::VARCHAR; +} + +LogicalType ICUtils::ToUCType(const LogicalType &input) { + // todo do we need this mapping? + throw NotImplementedException("ToUCType not yet implemented"); + switch (input.id()) { + case LogicalTypeId::BOOLEAN: + case LogicalTypeId::SMALLINT: + case LogicalTypeId::INTEGER: + case LogicalTypeId::BIGINT: + case LogicalTypeId::TINYINT: + case LogicalTypeId::UTINYINT: + case LogicalTypeId::USMALLINT: + case LogicalTypeId::UINTEGER: + case LogicalTypeId::UBIGINT: + case LogicalTypeId::FLOAT: + case LogicalTypeId::DOUBLE: + case LogicalTypeId::BLOB: + case LogicalTypeId::DATE: + case LogicalTypeId::DECIMAL: + case LogicalTypeId::TIMESTAMP: + case LogicalTypeId::TIMESTAMP_TZ: + case LogicalTypeId::VARCHAR: + return input; + case LogicalTypeId::LIST: + throw NotImplementedException("PC does not support arrays - unsupported type \"%s\"", input.ToString()); + case LogicalTypeId::STRUCT: + case LogicalTypeId::MAP: + case LogicalTypeId::UNION: + throw NotImplementedException("PC does not support composite types - unsupported type \"%s\"", + input.ToString()); + case LogicalTypeId::TIMESTAMP_SEC: + case LogicalTypeId::TIMESTAMP_MS: + case LogicalTypeId::TIMESTAMP_NS: + return LogicalType::TIMESTAMP; + case LogicalTypeId::HUGEINT: + return LogicalType::DOUBLE; + default: + return LogicalType::VARCHAR; + } +} + +} // namespace duckdb diff --git a/src/ic_create_table_as_op.cpp b/src/ic_create_table_as_op.cpp new file mode 100644 index 0000000..1510ac8 --- /dev/null +++ b/src/ic_create_table_as_op.cpp @@ -0,0 +1,226 @@ + +#include "duckdb.hpp" +#include "duckdb/execution/physical_operator.hpp" +#include "duckdb/parser/parsed_data/drop_info.hpp" +#include "duckdb/planner/logical_operator.hpp" +#include "duckdb/storage/data_table.hpp" +#include "parquet/arrow/writer.h" +#include "parquet/properties.h" + +#include "arrow/api.h" +#include "arrow/io/api.h" +#include "ic_create_table_as_op.hpp" +#include "jiceberg_generated/libjiceberg.h" +#include "jiceberg_generated/graal_isolate.h" + +namespace duckdb { + +template +void ProcessColumnData(const T *data_ptr, + arrow::NumericBuilder &builder, + const UnifiedVectorFormat &vdata, + std::shared_ptr &arr, + idx_t num_rows) { + + auto &validity = vdata.validity; + auto sel = vdata.sel; + for (idx_t row_idx = 0; row_idx < num_rows; row_idx++) { + idx_t source_idx = sel->get_index(row_idx); + if (validity.RowIsValid(source_idx)) { + if (!(builder.Append(data_ptr[source_idx])).ok()) { + throw std::runtime_error("Failed to append data"); + }; + } else { + if (!(builder.AppendNull()).ok()) { + throw std::runtime_error("Failed to append NULL"); + }; + } + } + + if (!(builder.Finish(&arr)).ok()) { + throw std::runtime_error("Failed to build Int32 Arrow array"); + } +} + +std::shared_ptr DataChunkToArrowTable(duckdb::DataChunk &chunk, ColumnList &columnList) { + namespace ar = arrow; + using namespace duckdb; + + int num_columns = chunk.ColumnCount(); + int64_t num_rows = chunk.size(); + + // Prepare to build schema (one Arrow field per DuckDB column) + std::vector> fields; + fields.reserve(num_columns); + + // Prepare a container for the final Arrow arrays (one per column) + std::vector> arrays; + arrays.reserve(num_columns); + + for (int col_idx = 0; col_idx < num_columns; col_idx++) { + auto &col_vector = chunk.data[col_idx]; + auto &duck_type = col_vector.GetType(); + string column_name = columnList.GetColumnNames()[col_idx]; + // We'll unify the vector so we can safely read data regardless of whether it's flat, constant, dictionary, etc. + UnifiedVectorFormat vdata; + col_vector.ToUnifiedFormat(num_rows, vdata); + // A selection vector mapping row_idx -> actual index in `vdata.data` + auto sel = vdata.sel; + // Validity (null) mask for each row + auto &validity = vdata.validity; + + auto metadata = std::make_shared(); + metadata->Append("PARQUET:field_id", std::to_string(col_idx+1)); + + // TODO: Handle other types as needed: LogicalTypeId::FLOAT, ::BOOLEAN, ::DATE, etc. + switch (duck_type.id()) { + case LogicalTypeId::INTEGER: { + fields.push_back( + ar::field(column_name, ar::int32())->WithMetadata(metadata)); + auto data_ptr = reinterpret_cast(vdata.data); + ar::Int32Builder builder; + std::shared_ptr arr; + ProcessColumnData(data_ptr, builder, vdata, arr, num_rows); + arrays.push_back(arr); + break; + } + + case LogicalTypeId::BIGINT: { + fields.push_back(ar::field(column_name, ar::int64())->WithMetadata(metadata)); + auto data_ptr = reinterpret_cast(vdata.data); + ar::Int64Builder builder; + std::shared_ptr arr; + ProcessColumnData(data_ptr, builder, vdata, arr, num_rows); + arrays.push_back(arr); + break; + } + + case LogicalTypeId::DOUBLE: { + fields.push_back(ar::field(column_name, ar::float64())->WithMetadata(metadata)); + auto data_ptr = reinterpret_cast(vdata.data); + ar::DoubleBuilder builder; + std::shared_ptr arr; + ProcessColumnData(data_ptr, builder, vdata, arr, num_rows); + arrays.push_back(arr); + break; + } + + case LogicalTypeId::VARCHAR: { + fields.push_back(ar::field(column_name, ar::utf8())->WithMetadata(metadata)); + ar::StringBuilder builder; + for (idx_t row_idx = 0; row_idx < num_rows; row_idx++) { + idx_t source_idx = sel->get_index(row_idx); + if (validity.RowIsValid(source_idx)) { + auto val = col_vector.GetValue(source_idx); + builder.Append(val.ToString()); + } else { + builder.AppendNull(); + } + } + std::shared_ptr arr; + if (!(builder.Finish(&arr)).ok()) { + throw std::runtime_error("Failed to build Int32 Arrow array"); + } + arrays.push_back(arr); + break; + } + + default: + throw std::runtime_error("Unsupported type: " + duck_type.ToString() + " for column " + column_name); + } + } + + // Build a final Arrow schema and table + auto schema = std::make_shared(fields); + return ar::Table::Make(schema, arrays, 1); +} + +OperatorResultType ICCreateTableAsOp::Execute(ExecutionContext &context, + DataChunk &input, + DataChunk &chunk, + GlobalOperatorState &gstate, + OperatorState &state) const { + auto *table_info = dynamic_cast(info->base.get()); + + // Create arrow table to write to parquet file + auto arrow_table = DataChunkToArrowTable(input, table_info->columns); + + // Create a file output stream + const std::string datafile_filename = + schemaEntry->schema_data->iceberg_catalog + "." + table_info->schema + "." + table_info->table + ".parquet"; + auto open_result = arrow::io::FileOutputStream::Open(datafile_filename); + + if (!open_result.ok()) { + throw std::runtime_error("Failed to open file: " + open_result.status().ToString()); + } + + std::shared_ptr datafile = *open_result; + parquet::WriterProperties::Builder builder; + builder.compression(parquet::Compression::SNAPPY); + builder.version(parquet::ParquetVersion::PARQUET_1_0); + std::shared_ptr writer_props = builder.build(); + arrow::Status write_status = parquet::arrow::WriteTable( + *arrow_table, + arrow::default_memory_pool(), + datafile, + 1024, // TODO: Is this a good chunk_size? + writer_props + ); + + if (!write_status.ok()) { + throw std::runtime_error("Error writing to Parquet: " + write_status.ToString()); + } + + // Create the table at the destination + auto transaction = schemaEntry->catalog.GetCatalogTransaction(context.client); + auto entry = schemaEntry->CreateTable(transaction, *info); + + // Initialize the GraalVM isolate + graal_isolate_t* isolate = nullptr; + graal_isolatethread_t* thread = nullptr; + if (graal_create_isolate(nullptr, &isolate, &thread) != 0) { + throw std::runtime_error("Failed to create GraalVM isolate"); + } + + string creds = table_credentials.client_id + ":" + table_credentials.client_secret; + char *result = nullptr; + try { + result = append_to_table( + thread, + const_cast(table_credentials.endpoint.c_str()), + const_cast(creds.c_str()), + const_cast(schemaEntry->schema_data->iceberg_catalog.c_str()), + const_cast(table_info->schema.c_str()), + const_cast(table_info->table.c_str()), + const_cast(datafile_filename.c_str()), + chunk.size()); + + } catch (...) { + DropInfo drop_info; + drop_info.type = CatalogType::TABLE_ENTRY; + drop_info.catalog = info->base->catalog; + drop_info.schema = table_info->schema; + drop_info.name = table_info->table; + schemaEntry->DropEntry(context.client, drop_info); + } + + // Clean up the GraalVM isolate + if (graal_detach_thread(thread) != 0) { + throw std::runtime_error("Failed to detach GraalVM thread"); + } + + if (result == nullptr) { + throw std::runtime_error("Failed to append to table"); + } + + // Update the table entry with the new metadata location + auto table_entry = dynamic_cast(entry.get()); + table_entry->table_data->metadata_location = result; + + // Set output chunk to empty + chunk.SetCardinality(0); + + return duckdb::OperatorResultType::FINISHED; +} + +} // namespace duckdb \ No newline at end of file diff --git a/src/iceberg_extension.cpp b/src/iceberg_extension.cpp index f82c6b4..2838ff4 100644 --- a/src/iceberg_extension.cpp +++ b/src/iceberg_extension.cpp @@ -1,20 +1,165 @@ #define DUCKDB_EXTENSION_MAIN #include "iceberg_extension.hpp" +#include "storage/ic_catalog.hpp" +#include "storage/ic_transaction_manager.hpp" #include "duckdb.hpp" +#include "duckdb/main/secret/secret_manager.hpp" #include "duckdb/common/exception.hpp" #include "duckdb/common/string_util.hpp" #include "duckdb/function/scalar_function.hpp" +#include "duckdb/main/extension_util.hpp" #include "duckdb/catalog/catalog_entry/macro_catalog_entry.hpp" #include "duckdb/catalog/default/default_functions.hpp" +#include "duckdb/parser/parsed_data/create_scalar_function_info.hpp" +#include "duckdb/parser/parsed_data/attach_info.hpp" +#include "duckdb/storage/storage_extension.hpp" #include "iceberg_functions.hpp" #include "yyjson.hpp" #include "duckdb/main/extension_util.hpp" #include +#include "catalog_api.hpp" namespace duckdb { +static unique_ptr CreateCatalogSecretFunction(ClientContext &, CreateSecretInput &input) { + // apply any overridden settings + vector prefix_paths; + auto result = make_uniq(prefix_paths, "iceberg", "config", input.name); + + for (const auto &named_param : input.options) { + auto lower_name = StringUtil::Lower(named_param.first); + + if (lower_name == "client_id" || + lower_name == "client_secret" || + lower_name == "endpoint" || + lower_name == "aws_region") { + result->secret_map[lower_name] = named_param.second.ToString(); + } else { + throw InternalException("Unknown named parameter passed to CreateUCSecretFunction: " + lower_name); + } + } + + // Get token from catalog + result->secret_map["token"] = ICAPI::GetToken( + result->secret_map["client_id"].ToString(), + result->secret_map["client_secret"].ToString(), + result->secret_map["endpoint"].ToString()); + + //! Set redact keys + result->redact_keys = {"token", "client_id", "client_secret"}; + + return std::move(result); +} + +static void SetCatalogSecretParameters(CreateSecretFunction &function) { + function.named_parameters["client_id"] = LogicalType::VARCHAR; + function.named_parameters["client_secret"] = LogicalType::VARCHAR; + function.named_parameters["endpoint"] = LogicalType::VARCHAR; + function.named_parameters["aws_region"] = LogicalType::VARCHAR; + function.named_parameters["token"] = LogicalType::VARCHAR; +} + +unique_ptr GetSecret(ClientContext &context, const string &secret_name) { + auto &secret_manager = SecretManager::Get(context); + auto transaction = CatalogTransaction::GetSystemCatalogTransaction(context); + // FIXME: this should be adjusted once the `GetSecretByName` API supports this + // use case + auto secret_entry = secret_manager.GetSecretByName(transaction, secret_name, "memory"); + if (secret_entry) { + return secret_entry; + } + secret_entry = secret_manager.GetSecretByName(transaction, secret_name, "local_file"); + if (secret_entry) { + return secret_entry; + } + return nullptr; +} + +static unique_ptr IcebergCatalogAttach(StorageExtensionInfo *storage_info, ClientContext &context, + AttachedDatabase &db, const string &name, AttachInfo &info, + AccessMode access_mode) { + ICCredentials credentials; + + // check if we have a secret provided + string secret_name; + for (auto &entry : info.options) { + auto lower_name = StringUtil::Lower(entry.first); + if (lower_name == "type" || lower_name == "read_only") { + // already handled + } else if (lower_name == "secret") { + secret_name = entry.second.ToString(); + } else { + throw BinderException("Unrecognized option for PC attach: %s", entry.first); + } + } + + // if no secret is specified we default to the unnamed mysql secret, if it + // exists + bool explicit_secret = !secret_name.empty(); + if (!explicit_secret) { + // look up settings from the default unnamed mysql secret if none is + // provided + secret_name = "__default_iceberg"; + } + + string connection_string = info.path; + auto secret_entry = GetSecret(context, secret_name); + if (secret_entry) { + // secret found - read data + const auto &kv_secret = dynamic_cast(*secret_entry->secret); + string new_connection_info; + + Value client_id = kv_secret.TryGetValue("client_id"); + if (client_id.IsNull()) { + throw std::runtime_error("CLIENT_ID is blank"); + } + credentials.client_id = client_id.ToString(); + + Value client_secret = kv_secret.TryGetValue("client_secret"); + if (client_secret.IsNull()) { + throw std::runtime_error("CLIENT_SECRET is blank"); + } + credentials.client_secret = client_secret.ToString(); + + Value token_val = kv_secret.TryGetValue("token"); + if (token_val.IsNull()) { + throw std::runtime_error("TOKEN is blank"); + } + credentials.token = token_val.ToString(); + + Value endpoint_val = kv_secret.TryGetValue("endpoint"); + credentials.endpoint = endpoint_val.IsNull() ? "" : endpoint_val.ToString(); + StringUtil::RTrim(credentials.endpoint, "/"); + + Value aws_region_val = kv_secret.TryGetValue("aws_region"); + credentials.aws_region = endpoint_val.IsNull() ? "" : aws_region_val.ToString(); + + } else if (explicit_secret) { + // secret not found and one was explicitly provided - throw an error + throw BinderException("Secret with name \"%s\" not found", secret_name); + } + + // TODO: Check catalog with name actually exists! + + return make_uniq(db, info.path, access_mode, credentials); +} + +static unique_ptr CreateTransactionManager(StorageExtensionInfo *storage_info, AttachedDatabase &db, + Catalog &catalog) { + auto &ic_catalog = catalog.Cast(); + return make_uniq(db, ic_catalog); +} + +class ICCatalogStorageExtension : public StorageExtension { +public: + ICCatalogStorageExtension() { + attach = IcebergCatalogAttach; + create_transaction_manager = CreateTransactionManager; + } +}; + static void LoadInternal(DatabaseInstance &instance) { auto &config = DBConfig::GetConfig(instance); @@ -34,6 +179,20 @@ static void LoadInternal(DatabaseInstance &instance) { for (auto &fun : IcebergFunctions::GetScalarFunctions()) { ExtensionUtil::RegisterFunction(instance, fun); } + + ICAPI::InitializeCurl(); + + SecretType secret_type; + secret_type.name = "iceberg"; + secret_type.deserializer = KeyValueSecret::Deserialize; + secret_type.default_provider = "config"; + + ExtensionUtil::RegisterSecretType(instance, secret_type); + CreateSecretFunction secret_function = {"iceberg", "config", CreateCatalogSecretFunction}; + SetCatalogSecretParameters(secret_function); + ExtensionUtil::RegisterFunction(instance, secret_function); + + config.storage_extensions["iceberg"] = make_uniq(); } void IcebergExtension::Load(DuckDB &db) { diff --git a/src/include/catalog_api.hpp b/src/include/catalog_api.hpp new file mode 100644 index 0000000..f6c412e --- /dev/null +++ b/src/include/catalog_api.hpp @@ -0,0 +1,61 @@ + +#pragma once + +#include "duckdb/common/types.hpp" +#include "duckdb/parser/parsed_data/create_table_info.hpp" + +namespace duckdb { +struct ICCredentials; + +struct ICAPIColumnDefinition { + string name; + string type_text; + idx_t precision; + idx_t scale; + idx_t position; +}; + +struct ICAPITable { + string table_id; + + string name; + string catalog_name; + string schema_name; + string table_type; + string data_source_format; + string metadata_location; + + vector columns; +}; + +struct ICAPISchema { + string schema_name; + string catalog_name; + string iceberg_catalog; +}; + +struct ICAPITableCredentials { + string key_id; + string secret; + string session_token; +}; + +class ICAPI { +public: + //! WARNING: not thread-safe. To be called once on extension initialization + static void InitializeCurl(); + + static ICAPITableCredentials GetTableCredentials(const string &iceberg_catalog, const string &schema, const string &table, ICCredentials credentials); + static vector GetCatalogs(const string &catalog, ICCredentials credentials); + static vector GetTables(const string &catalog, const string &iceberg_catalog, const string &schema, ICCredentials credentials); + static ICAPITable GetTable(const string &catalog, const string &iceberg_catalog, const string &schema, const string &table_name, std::optional credentials); + static vector GetSchemas(const string &catalog, const string &iceberg_catalog, ICCredentials credentials); + static vector GetTablesInSchema(const string &catalog, const string &schema, ICCredentials credentials); + static string GetToken(string id, string secret, string endpoint); + static ICAPISchema CreateSchema(const string &catalog, const string &iceberg_catalog, const string &schema, ICCredentials credentials); + static void DropSchema(const string &iceberg_catalog, const string &schema, ICCredentials credentials); + static ICAPITable CreateTable(const string &catalog, const string &iceberg_catalog, const string &schema, ICCredentials &credentials, CreateTableInfo *table_info); + static void DropTable(const string &catalog, const string &iceberg_catalog, const string &schema, string &table_name, ICCredentials credentials); +}; + +} // namespace duckdb diff --git a/src/include/catalog_utils.hpp b/src/include/catalog_utils.hpp new file mode 100644 index 0000000..75186d3 --- /dev/null +++ b/src/include/catalog_utils.hpp @@ -0,0 +1,27 @@ + +#pragma once + +#include "duckdb.hpp" +#include "catalog_api.hpp" + +namespace duckdb { +class ICSchemaEntry; +class ICTransaction; + +enum class ICTypeAnnotation { STANDARD, CAST_TO_VARCHAR, NUMERIC_AS_DOUBLE, CTID, JSONB, FIXED_LENGTH_CHAR }; + +struct ICType { + idx_t oid = 0; + ICTypeAnnotation info = ICTypeAnnotation::STANDARD; + vector children; +}; + +class ICUtils { +public: + static LogicalType ToUCType(const LogicalType &input); + static LogicalType TypeToLogicalType(const string &columnDefinition); + static string TypeToString(const LogicalType &input); + static string LogicalToIcebergType(const LogicalType &input); +}; + +} // namespace duckdb diff --git a/src/include/ic_create_table_as_op.hpp b/src/include/ic_create_table_as_op.hpp new file mode 100644 index 0000000..a2ae47c --- /dev/null +++ b/src/include/ic_create_table_as_op.hpp @@ -0,0 +1,38 @@ + +#pragma once + +#include "duckdb/execution/physical_operator.hpp" +#include "duckdb/execution/operator/schema/physical_create_table.hpp" +#include "duckdb/planner/parsed_data/bound_create_table_info.hpp" + +#include "storage/ic_catalog.hpp" +#include "storage/ic_schema_entry.hpp" + +namespace duckdb { + +class ICCreateTableAsOp : public PhysicalOperator { +public: + ICCreateTableAsOp(vector types, + unique_ptr info, + ICSchemaEntry *schemaEntry, + idx_t estimated_cardinality, + ICCredentials &credentials) + : PhysicalOperator(PhysicalOperatorType::CREATE_TABLE_AS, types, estimated_cardinality), + info(std::move(info)), + schemaEntry(schemaEntry), + table_credentials(credentials) {} + + // Override the Execute method + OperatorResultType Execute(ExecutionContext &context, + DataChunk &input, + DataChunk &chunk, + GlobalOperatorState &gstate, + OperatorState &state) const override; + +private: + unique_ptr info; + ICSchemaEntry *schemaEntry; + ICCredentials &table_credentials; +}; + +} \ No newline at end of file diff --git a/src/include/storage/ic_catalog.hpp b/src/include/storage/ic_catalog.hpp new file mode 100644 index 0000000..1c0abee --- /dev/null +++ b/src/include/storage/ic_catalog.hpp @@ -0,0 +1,81 @@ + +#pragma once + +#include "duckdb/catalog/catalog.hpp" +#include "duckdb/function/table_function.hpp" +#include "duckdb/common/enums/access_mode.hpp" +#include "storage/ic_schema_set.hpp" + +namespace duckdb { +class ICSchemaEntry; + +struct ICCredentials { + string endpoint; + string client_id; + string client_secret; + // required to query s3 tables + string aws_region; + // Catalog generates the token using client id & secret + string token; +}; + +class ICClearCacheFunction : public TableFunction { +public: + ICClearCacheFunction(); + + static void ClearCacheOnSetting(ClientContext &context, SetScope scope, Value ¶meter); +}; + + +class ICCatalog : public Catalog { +public: + explicit ICCatalog(AttachedDatabase &db_p, const string &internal_name, AccessMode access_mode, + ICCredentials credentials); + ~ICCatalog(); + + string internal_name; + AccessMode access_mode; + ICCredentials credentials; + +public: + void Initialize(bool load_builtin) override; + string GetCatalogType() override { + return "iceberg"; + } + + optional_ptr CreateSchema(CatalogTransaction transaction, CreateSchemaInfo &info) override; + + void ScanSchemas(ClientContext &context, std::function callback) override; + + optional_ptr GetSchema(CatalogTransaction transaction, const string &schema_name, + OnEntryNotFound if_not_found, + QueryErrorContext error_context = QueryErrorContext()) override; + + unique_ptr PlanInsert(ClientContext &context, LogicalInsert &op, + unique_ptr plan) override; + unique_ptr PlanCreateTableAs(ClientContext &context, LogicalCreateTable &op, + unique_ptr plan) override; + unique_ptr PlanDelete(ClientContext &context, LogicalDelete &op, + unique_ptr plan) override; + unique_ptr PlanUpdate(ClientContext &context, LogicalUpdate &op, + unique_ptr plan) override; + unique_ptr BindCreateIndex(Binder &binder, CreateStatement &stmt, TableCatalogEntry &table, + unique_ptr plan) override; + + DatabaseSize GetDatabaseSize(ClientContext &context) override; + + //! Whether or not this is an in-memory PC database + bool InMemory() override; + string GetDBPath() override; + + void ClearCache(); + +private: + void DropSchema(ClientContext &context, DropInfo &info) override; + +private: + ICSchemaSet schemas; + string default_schema; +}; + +} // namespace duckdb diff --git a/src/include/storage/ic_catalog_set.hpp b/src/include/storage/ic_catalog_set.hpp new file mode 100644 index 0000000..fd988d4 --- /dev/null +++ b/src/include/storage/ic_catalog_set.hpp @@ -0,0 +1,37 @@ + +#pragma once + +#include "duckdb/transaction/transaction.hpp" +#include "duckdb/common/case_insensitive_map.hpp" +#include "duckdb/common/mutex.hpp" + +namespace duckdb { +struct DropInfo; +class ICSchemaEntry; +class ICTransaction; + +class ICCatalogSet { +public: + ICCatalogSet(Catalog &catalog); + + optional_ptr GetEntry(ClientContext &context, const string &name); + virtual void DropEntry(ClientContext &context, DropInfo &info); + void Scan(ClientContext &context, const std::function &callback); + virtual optional_ptr AddEntry(unique_ptr entry); + void ClearEntries(); + +protected: + virtual void LoadEntries(ClientContext &context) = 0; + virtual void FillEntry(ClientContext &context, unique_ptr &entry) = 0; + + void EraseEntryInternal(const string &name); + +protected: + Catalog &catalog; + case_insensitive_map_t> entries; + +private: + mutex entry_lock; +}; + +} // namespace duckdb diff --git a/src/include/storage/ic_schema_entry.hpp b/src/include/storage/ic_schema_entry.hpp new file mode 100644 index 0000000..344066b --- /dev/null +++ b/src/include/storage/ic_schema_entry.hpp @@ -0,0 +1,46 @@ + +#pragma once + +#include "catalog_api.hpp" +#include "duckdb/catalog/catalog_entry/schema_catalog_entry.hpp" +#include "storage/ic_table_set.hpp" + +namespace duckdb { +class ICTransaction; + +class ICSchemaEntry : public SchemaCatalogEntry { +public: + ICSchemaEntry(Catalog &catalog, CreateSchemaInfo &info); + ~ICSchemaEntry() override; + + unique_ptr schema_data; + +public: + optional_ptr CreateTable(CatalogTransaction transaction, BoundCreateTableInfo &info) override; + optional_ptr CreateFunction(CatalogTransaction transaction, CreateFunctionInfo &info) override; + optional_ptr CreateIndex(CatalogTransaction transaction, CreateIndexInfo &info, + TableCatalogEntry &table) override; + optional_ptr CreateView(CatalogTransaction transaction, CreateViewInfo &info) override; + optional_ptr CreateSequence(CatalogTransaction transaction, CreateSequenceInfo &info) override; + optional_ptr CreateTableFunction(CatalogTransaction transaction, + CreateTableFunctionInfo &info) override; + optional_ptr CreateCopyFunction(CatalogTransaction transaction, + CreateCopyFunctionInfo &info) override; + optional_ptr CreatePragmaFunction(CatalogTransaction transaction, + CreatePragmaFunctionInfo &info) override; + optional_ptr CreateCollation(CatalogTransaction transaction, CreateCollationInfo &info) override; + optional_ptr CreateType(CatalogTransaction transaction, CreateTypeInfo &info) override; + void Alter(CatalogTransaction transaction, AlterInfo &info) override; + void Scan(ClientContext &context, CatalogType type, const std::function &callback) override; + void Scan(CatalogType type, const std::function &callback) override; + void DropEntry(ClientContext &context, DropInfo &info) override; + optional_ptr GetEntry(CatalogTransaction transaction, CatalogType type, const string &name) override; + +private: + ICCatalogSet &GetCatalogSet(CatalogType type); + +private: + ICTableSet tables; +}; + +} // namespace duckdb diff --git a/src/include/storage/ic_schema_set.hpp b/src/include/storage/ic_schema_set.hpp new file mode 100644 index 0000000..6a5068c --- /dev/null +++ b/src/include/storage/ic_schema_set.hpp @@ -0,0 +1,23 @@ + +#pragma once + +#include "storage/ic_catalog_set.hpp" +#include "storage/ic_schema_entry.hpp" + +namespace duckdb { +struct CreateSchemaInfo; + +class ICSchemaSet : public ICCatalogSet { +public: + explicit ICSchemaSet(Catalog &catalog); + +public: + optional_ptr CreateSchema(ClientContext &context, CreateSchemaInfo &info); + void DropSchema(ClientContext &context, DropInfo &info); + +protected: + void LoadEntries(ClientContext &context) override; + void FillEntry(ClientContext &context, unique_ptr &entry) override; +}; + +} // namespace duckdb diff --git a/src/include/storage/ic_table_entry.hpp b/src/include/storage/ic_table_entry.hpp new file mode 100644 index 0000000..e415c1c --- /dev/null +++ b/src/include/storage/ic_table_entry.hpp @@ -0,0 +1,46 @@ + +#pragma once + +#include "catalog_api.hpp" +#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp" +#include "duckdb/parser/parsed_data/create_table_info.hpp" + +namespace duckdb { + +struct ICTableInfo { + ICTableInfo() { + create_info = make_uniq(); + } + ICTableInfo(const string &schema, const string &table) { + create_info = make_uniq(string(), schema, table); + } + ICTableInfo(const SchemaCatalogEntry &schema, const string &table) { + create_info = make_uniq((SchemaCatalogEntry &)schema, table); + } + + const string &GetTableName() const { + return create_info->table; + } + + unique_ptr create_info; +}; + +class ICTableEntry : public TableCatalogEntry { +public: + ICTableEntry(Catalog &catalog, SchemaCatalogEntry &schema, CreateTableInfo &info); + ICTableEntry(Catalog &catalog, SchemaCatalogEntry &schema, ICTableInfo &info); + + unique_ptr table_data; + +public: + unique_ptr GetStatistics(ClientContext &context, column_t column_id) override; + + TableFunction GetScanFunction(ClientContext &context, unique_ptr &bind_data) override; + + TableStorageInfo GetStorageInfo(ClientContext &context) override; + + void BindUpdateConstraints(Binder &binder, LogicalGet &get, LogicalProjection &proj, LogicalUpdate &update, + ClientContext &context) override; +}; + +} // namespace duckdb diff --git a/src/include/storage/ic_table_set.hpp b/src/include/storage/ic_table_set.hpp new file mode 100644 index 0000000..bbfbf80 --- /dev/null +++ b/src/include/storage/ic_table_set.hpp @@ -0,0 +1,51 @@ + +#pragma once + +#include "storage/ic_catalog_set.hpp" +#include "storage/ic_table_entry.hpp" + +namespace duckdb { +struct CreateTableInfo; +class ICResult; +class ICSchemaEntry; + + +class ICInSchemaSet : public ICCatalogSet { +public: + ICInSchemaSet(ICSchemaEntry &schema); + + optional_ptr AddEntry(unique_ptr entry) override; + +protected: + ICSchemaEntry &schema; +}; + + +class ICTableSet : public ICInSchemaSet { +public: + explicit ICTableSet(ICSchemaEntry &schema); + +public: + optional_ptr CreateTable(ClientContext &context, BoundCreateTableInfo &info); + static unique_ptr GetTableInfo(ClientContext &context, ICSchemaEntry &schema, const string &table_name); + optional_ptr RefreshTable(ClientContext &context, const string &table_name); + void AlterTable(ClientContext &context, AlterTableInfo &info); + void DropTable(ClientContext &context, DropInfo &info); + +protected: + void LoadEntries(ClientContext &context) override; + void FillEntry(ClientContext &context, unique_ptr &entry) override; + + void AlterTable(ClientContext &context, RenameTableInfo &info); + void AlterTable(ClientContext &context, RenameColumnInfo &info); + void AlterTable(ClientContext &context, AddColumnInfo &info); + void AlterTable(ClientContext &context, RemoveColumnInfo &info); + + static void AddColumn(ClientContext &context, ICResult &result, ICTableInfo &table_info, idx_t column_offset = 0); + +private: + unique_ptr CreateTableEntry(Catalog &catalog, SchemaCatalogEntry &schema, ICAPITable table); +}; + + +} // namespace duckdb diff --git a/src/include/storage/ic_transaction.hpp b/src/include/storage/ic_transaction.hpp new file mode 100644 index 0000000..79baf62 --- /dev/null +++ b/src/include/storage/ic_transaction.hpp @@ -0,0 +1,32 @@ + +#pragma once + +#include "duckdb/transaction/transaction.hpp" + +namespace duckdb { +class ICCatalog; +class ICSchemaEntry; +class ICTableEntry; + +enum class ICTransactionState { TRANSACTION_NOT_YET_STARTED, TRANSACTION_STARTED, TRANSACTION_FINISHED }; + +class ICTransaction : public Transaction { +public: + ICTransaction(ICCatalog &ic_catalog, TransactionManager &manager, ClientContext &context); + ~ICTransaction() override; + + void Start(); + void Commit(); + void Rollback(); + + static ICTransaction &Get(ClientContext &context, Catalog &catalog); + AccessMode GetAccessMode() const { + return access_mode; + } + +private: + ICTransactionState transaction_state; + AccessMode access_mode; +}; + +} // namespace duckdb diff --git a/src/include/storage/ic_transaction_manager.hpp b/src/include/storage/ic_transaction_manager.hpp new file mode 100644 index 0000000..5337234 --- /dev/null +++ b/src/include/storage/ic_transaction_manager.hpp @@ -0,0 +1,26 @@ + +#pragma once + +#include "duckdb/transaction/transaction_manager.hpp" +#include "storage/ic_catalog.hpp" +#include "storage/ic_transaction.hpp" + +namespace duckdb { + +class ICTransactionManager : public TransactionManager { +public: + ICTransactionManager(AttachedDatabase &db_p, ICCatalog &ic_catalog); + + Transaction &StartTransaction(ClientContext &context) override; + ErrorData CommitTransaction(ClientContext &context, Transaction &transaction) override; + void RollbackTransaction(Transaction &transaction) override; + + void Checkpoint(ClientContext &context, bool force = false) override; + +private: + ICCatalog &ic_catalog; + mutex transaction_lock; + reference_map_t> transactions; +}; + +} // namespace duckdb diff --git a/src/storage/ic_catalog.cpp b/src/storage/ic_catalog.cpp new file mode 100644 index 0000000..acae844 --- /dev/null +++ b/src/storage/ic_catalog.cpp @@ -0,0 +1,124 @@ +#include "storage/ic_catalog.hpp" +#include "storage/ic_schema_entry.hpp" +#include "storage/ic_transaction.hpp" +#include "duckdb/storage/database_size.hpp" +#include "duckdb/parser/parsed_data/drop_info.hpp" +#include "duckdb/parser/parsed_data/create_schema_info.hpp" +#include "duckdb/planner/operator/logical_create_table.hpp" +#include "duckdb/main/attached_database.hpp" +#include "duckdb/execution/operator/schema/physical_create_table.hpp" +#include "ic_create_table_as_op.hpp" + +namespace duckdb { + +ICCatalog::ICCatalog(AttachedDatabase &db_p, const string &internal_name, AccessMode access_mode, + ICCredentials credentials) + : Catalog(db_p), internal_name(internal_name), access_mode(access_mode), credentials(std::move(credentials)), + schemas(*this) { +} + +ICCatalog::~ICCatalog() = default; + +void ICCatalog::Initialize(bool load_builtin) { +} + +optional_ptr ICCatalog::CreateSchema(CatalogTransaction transaction, CreateSchemaInfo &info) { + if (info.on_conflict == OnCreateConflict::REPLACE_ON_CONFLICT) { + DropInfo try_drop; + try_drop.type = CatalogType::SCHEMA_ENTRY; + try_drop.name = info.schema; + try_drop.if_not_found = OnEntryNotFound::RETURN_NULL; + try_drop.cascade = false; + schemas.DropSchema(transaction.GetContext(), try_drop); + } + return schemas.CreateSchema(transaction.GetContext(), info); +} + +void ICCatalog::DropSchema(ClientContext &context, DropInfo &info) { + return schemas.DropSchema(context, info); +} + +void ICCatalog::ScanSchemas(ClientContext &context, std::function callback) { + schemas.Scan(context, [&](CatalogEntry &schema) { callback(schema.Cast()); }); +} + +optional_ptr ICCatalog::GetSchema(CatalogTransaction transaction, const string &schema_name, + OnEntryNotFound if_not_found, QueryErrorContext error_context) { + if (schema_name == DEFAULT_SCHEMA) { + if (default_schema.empty()) { + throw InvalidInputException("Attempting to fetch the default schema - but no database was " + "provided in the connection string"); + } + return GetSchema(transaction, default_schema, if_not_found, error_context); + } + auto entry = schemas.GetEntry(transaction.GetContext(), schema_name); + if (!entry && if_not_found != OnEntryNotFound::RETURN_NULL) { + throw BinderException("Schema with name \"%s\" not found", schema_name); + } + return reinterpret_cast(entry.get()); +} + +bool ICCatalog::InMemory() { + return false; +} + +string ICCatalog::GetDBPath() { + return internal_name; +} + +DatabaseSize ICCatalog::GetDatabaseSize(ClientContext &context) { + if (default_schema.empty()) { + throw InvalidInputException("Attempting to fetch the database size - but no database was provided " + "in the connection string"); + } + DatabaseSize size; + return size; +} + +void ICCatalog::ClearCache() { + schemas.ClearEntries(); +} + +unique_ptr ICCatalog::PlanInsert(ClientContext &context, LogicalInsert &op, + unique_ptr plan) { + throw NotImplementedException("ICCatalog PlanInsert"); +} + +unique_ptr ICCatalog::PlanCreateTableAs( + ClientContext &context, LogicalCreateTable &op, unique_ptr plan) { + + CatalogEntry *catalog_entry = schemas.GetEntry(context, op.info->base->schema).get(); + if (!catalog_entry) { + throw BinderException("Schema not found"); + } + + auto schemaEntry = static_cast(catalog_entry); + auto create_table_as = make_uniq( + op.types, + std::move(op.info), + schemaEntry, + op.estimated_cardinality, + credentials); + + // If your operator has children (input plan), attach them + if (plan) { + create_table_as->children.push_back(std::move(plan)); + } + + return create_table_as; +} + +unique_ptr ICCatalog::PlanDelete(ClientContext &context, LogicalDelete &op, + unique_ptr plan) { + throw NotImplementedException("ICCatalog PlanDelete"); +} +unique_ptr ICCatalog::PlanUpdate(ClientContext &context, LogicalUpdate &op, + unique_ptr plan) { + throw NotImplementedException("ICCatalog PlanUpdate"); +} +unique_ptr ICCatalog::BindCreateIndex(Binder &binder, CreateStatement &stmt, TableCatalogEntry &table, + unique_ptr plan) { + throw NotImplementedException("ICCatalog BindCreateIndex"); +} + +} // namespace duckdb diff --git a/src/storage/ic_catalog_set.cpp b/src/storage/ic_catalog_set.cpp new file mode 100644 index 0000000..13fb2e6 --- /dev/null +++ b/src/storage/ic_catalog_set.cpp @@ -0,0 +1,54 @@ +#include "storage/ic_catalog_set.hpp" +#include "storage/ic_transaction.hpp" +#include "duckdb/parser/parsed_data/drop_info.hpp" +#include "storage/ic_schema_entry.hpp" + +namespace duckdb { + +ICCatalogSet::ICCatalogSet(Catalog &catalog) : catalog(catalog) { +} + +optional_ptr ICCatalogSet::GetEntry(ClientContext &context, const string &name) { + LoadEntries(context); + lock_guard l(entry_lock); + auto entry = entries.find(name); + if (entry == entries.end()) { + return nullptr; + } + FillEntry(context, entry->second); + return entry->second.get(); +} + +void ICCatalogSet::DropEntry(ClientContext &context, DropInfo &info) { + EraseEntryInternal(info.name); +} + +void ICCatalogSet::EraseEntryInternal(const string &name) { + lock_guard l(entry_lock); + entries.erase(name); +} + +void ICCatalogSet::Scan(ClientContext &context, const std::function &callback) { + LoadEntries(context); + + lock_guard l(entry_lock); + for (auto &entry : entries) { + callback(*entry.second); + } +} + +optional_ptr ICCatalogSet::AddEntry(unique_ptr entry) { + lock_guard l(entry_lock); + auto result = entry.get(); + if (result->name.empty()) { + throw InternalException("ICCatalogSet::CreateEntry called with empty name"); + } + entries.insert(make_pair(result->name, std::move(entry))); + return result; +} + +void ICCatalogSet::ClearEntries() { + entries.clear(); +} + +} // namespace duckdb diff --git a/src/storage/ic_clear_cache.cpp b/src/storage/ic_clear_cache.cpp new file mode 100644 index 0000000..0e07887 --- /dev/null +++ b/src/storage/ic_clear_cache.cpp @@ -0,0 +1,50 @@ +#include "duckdb.hpp" + +#include "duckdb/parser/parsed_data/create_table_function_info.hpp" +#include "duckdb/main/database_manager.hpp" +#include "duckdb/main/attached_database.hpp" +#include "storage/ic_catalog.hpp" + +namespace duckdb { + +struct ClearCacheFunctionData : public TableFunctionData { + bool finished = false; +}; + +static unique_ptr ClearCacheBind(ClientContext &context, TableFunctionBindInput &input, + vector &return_types, vector &names) { + + auto result = make_uniq(); + return_types.push_back(LogicalType::BOOLEAN); + names.emplace_back("Success"); + return std::move(result); +} + +static void ClearUCCaches(ClientContext &context) { + auto databases = DatabaseManager::Get(context).GetDatabases(context); + for (auto &db_ref : databases) { + auto &db = db_ref.get(); + auto &catalog = db.GetCatalog(); + if (catalog.GetCatalogType() != "iceberg") { + continue; + } + catalog.Cast().ClearCache(); + } +} + +static void ClearCacheFunction(ClientContext &context, TableFunctionInput &data_p, DataChunk &output) { + auto &data = data_p.bind_data->CastNoConst(); + if (data.finished) { + return; + } + ClearUCCaches(context); + data.finished = true; +} + +void ICClearCacheFunction::ClearCacheOnSetting(ClientContext &context, SetScope scope, Value ¶meter) { + ClearUCCaches(context); +} + +ICClearCacheFunction::ICClearCacheFunction() : TableFunction("pc_clear_cache", {}, ClearCacheFunction, ClearCacheBind) { +} +} // namespace duckdb diff --git a/src/storage/ic_schema_entry.cpp b/src/storage/ic_schema_entry.cpp new file mode 100644 index 0000000..4046ac7 --- /dev/null +++ b/src/storage/ic_schema_entry.cpp @@ -0,0 +1,165 @@ +#include "storage/ic_schema_entry.hpp" +#include "storage/ic_table_entry.hpp" +#include "storage/ic_transaction.hpp" +#include "duckdb/parser/parsed_data/create_view_info.hpp" +#include "duckdb/parser/parsed_data/create_index_info.hpp" +#include "duckdb/planner/parsed_data/bound_create_table_info.hpp" +#include "duckdb/parser/parsed_data/drop_info.hpp" +#include "duckdb/parser/constraints/list.hpp" +#include "duckdb/common/unordered_set.hpp" +#include "duckdb/parser/parsed_data/alter_info.hpp" +#include "duckdb/parser/parsed_data/alter_table_info.hpp" +#include "duckdb/parser/parsed_expression_iterator.hpp" + +namespace duckdb { + +ICSchemaEntry::ICSchemaEntry(Catalog &catalog, CreateSchemaInfo &info) + : SchemaCatalogEntry(catalog, info), tables(*this) { +} + +ICSchemaEntry::~ICSchemaEntry() { +} + +ICTransaction &GetUCTransaction(CatalogTransaction transaction) { + if (!transaction.transaction) { + throw InternalException("No transaction!?"); + } + return transaction.transaction->Cast(); +} + +optional_ptr ICSchemaEntry::CreateTable(CatalogTransaction transaction, BoundCreateTableInfo &info) { + auto &base_info = info.Base(); + auto table_name = base_info.table; + if (base_info.on_conflict == OnCreateConflict::REPLACE_ON_CONFLICT) { + throw NotImplementedException("REPLACE ON CONFLICT in CreateTable"); + } + return tables.CreateTable(transaction.GetContext(), info); +} + +void ICSchemaEntry::DropEntry(ClientContext &context, DropInfo &info) { + if (info.type != CatalogType::TABLE_ENTRY) { + throw BinderException("Expecting table entry"); + } + tables.DropTable(context, info); + GetCatalogSet(info.type).DropEntry(context, info); +} + +optional_ptr ICSchemaEntry::CreateFunction(CatalogTransaction transaction, CreateFunctionInfo &info) { + throw BinderException("PC databases do not support creating functions"); +} + +void ICUnqualifyColumnRef(ParsedExpression &expr) { + if (expr.type == ExpressionType::COLUMN_REF) { + auto &colref = expr.Cast(); + auto name = std::move(colref.column_names.back()); + colref.column_names = {std::move(name)}; + return; + } + ParsedExpressionIterator::EnumerateChildren(expr, ICUnqualifyColumnRef); +} + +optional_ptr ICSchemaEntry::CreateIndex(CatalogTransaction transaction, CreateIndexInfo &info, + TableCatalogEntry &table) { + throw NotImplementedException("CreateIndex"); +} + +string GetUCCreateView(CreateViewInfo &info) { + throw NotImplementedException("GetCreateView"); +} + +optional_ptr ICSchemaEntry::CreateView(CatalogTransaction transaction, CreateViewInfo &info) { + if (info.sql.empty()) { + throw BinderException("Cannot create view that originated from an " + "empty SQL statement"); + } + if (info.on_conflict == OnCreateConflict::REPLACE_ON_CONFLICT || + info.on_conflict == OnCreateConflict::IGNORE_ON_CONFLICT) { + auto current_entry = GetEntry(transaction, CatalogType::VIEW_ENTRY, info.view_name); + if (current_entry) { + if (info.on_conflict == OnCreateConflict::IGNORE_ON_CONFLICT) { + return current_entry; + } + throw NotImplementedException("REPLACE ON CONFLICT in CreateView"); + } + } + auto &ic_transaction = GetUCTransaction(transaction); + // ic_transaction.Query(GetUCCreateView(info)); + return tables.RefreshTable(transaction.GetContext(), info.view_name); +} + +optional_ptr ICSchemaEntry::CreateType(CatalogTransaction transaction, CreateTypeInfo &info) { + throw BinderException("PC databases do not support creating types"); +} + +optional_ptr ICSchemaEntry::CreateSequence(CatalogTransaction transaction, CreateSequenceInfo &info) { + throw BinderException("PC databases do not support creating sequences"); +} + +optional_ptr ICSchemaEntry::CreateTableFunction(CatalogTransaction transaction, + CreateTableFunctionInfo &info) { + throw BinderException("PC databases do not support creating table functions"); +} + +optional_ptr ICSchemaEntry::CreateCopyFunction(CatalogTransaction transaction, + CreateCopyFunctionInfo &info) { + throw BinderException("PC databases do not support creating copy functions"); +} + +optional_ptr ICSchemaEntry::CreatePragmaFunction(CatalogTransaction transaction, + CreatePragmaFunctionInfo &info) { + throw BinderException("PC databases do not support creating pragma functions"); +} + +optional_ptr ICSchemaEntry::CreateCollation(CatalogTransaction transaction, CreateCollationInfo &info) { + throw BinderException("PC databases do not support creating collations"); +} + +void ICSchemaEntry::Alter(CatalogTransaction transaction, AlterInfo &info) { + if (info.type != AlterType::ALTER_TABLE) { + throw BinderException("Only altering tables is supported for now"); + } + auto &alter = info.Cast(); + tables.AlterTable(transaction.GetContext(), alter); +} + +bool CatalogTypeIsSupported(CatalogType type) { + switch (type) { + case CatalogType::INDEX_ENTRY: + case CatalogType::TABLE_ENTRY: + case CatalogType::VIEW_ENTRY: + return true; + default: + return false; + } +} + +void ICSchemaEntry::Scan(ClientContext &context, CatalogType type, + const std::function &callback) { + if (!CatalogTypeIsSupported(type)) { + return; + } + GetCatalogSet(type).Scan(context, callback); +} +void ICSchemaEntry::Scan(CatalogType type, const std::function &callback) { + throw NotImplementedException("Scan without context not supported"); +} + +optional_ptr ICSchemaEntry::GetEntry(CatalogTransaction transaction, CatalogType type, + const string &name) { + if (!CatalogTypeIsSupported(type)) { + return nullptr; + } + return GetCatalogSet(type).GetEntry(transaction.GetContext(), name); +} + +ICCatalogSet &ICSchemaEntry::GetCatalogSet(CatalogType type) { + switch (type) { + case CatalogType::TABLE_ENTRY: + case CatalogType::VIEW_ENTRY: + return tables; + default: + throw InternalException("Type not supported for GetCatalogSet"); + } +} + +} // namespace duckdb diff --git a/src/storage/ic_schema_set.cpp b/src/storage/ic_schema_set.cpp new file mode 100644 index 0000000..28db677 --- /dev/null +++ b/src/storage/ic_schema_set.cpp @@ -0,0 +1,56 @@ +#include "catalog_api.hpp" +#include "duckdb/parser/parsed_data/create_schema_info.hpp" +#include "duckdb/parser/parsed_data/drop_info.hpp" +#include "duckdb/catalog/catalog.hpp" +#include "storage/ic_catalog.hpp" +#include "storage/ic_schema_set.hpp" +#include "storage/ic_transaction.hpp" + +namespace duckdb { + +ICSchemaSet::ICSchemaSet(Catalog &catalog) : ICCatalogSet(catalog) { +} + +static bool IsInternalTable(const string &catalog, const string &schema) { + if (schema == "information_schema") { + return true; + } + return false; +} + +void ICSchemaSet::LoadEntries(ClientContext &context) { + if (!entries.empty()) { + return; + } + + auto &ic_catalog = catalog.Cast(); + auto schemas = ICAPI::GetSchemas(catalog.GetName(), ic_catalog.internal_name, ic_catalog.credentials); + for (const auto &schema : schemas) { + CreateSchemaInfo info; + info.schema = schema.schema_name; + info.internal = IsInternalTable(schema.catalog_name, schema.schema_name); + auto schema_entry = make_uniq(catalog, info); + schema_entry->schema_data = make_uniq(schema); + AddEntry(std::move(schema_entry)); + } +} + +void ICSchemaSet::FillEntry(ClientContext &context, unique_ptr &entry) { + // Nothing to do +} + +optional_ptr ICSchemaSet::CreateSchema(ClientContext &context, CreateSchemaInfo &info) { + auto &ic_catalog = catalog.Cast(); + auto schema = ICAPI::CreateSchema(catalog.GetName(), ic_catalog.internal_name, info.schema, ic_catalog.credentials); + auto schema_entry = make_uniq(catalog, info); + schema_entry->schema_data = make_uniq(schema); + return AddEntry(std::move(schema_entry)); +} + +void ICSchemaSet::DropSchema(ClientContext &context, DropInfo &info) { + auto &ic_catalog = catalog.Cast(); + ICAPI::DropSchema(ic_catalog.internal_name, info.name, ic_catalog.credentials); + DropEntry(context, info); +} + +} // namespace duckdb diff --git a/src/storage/ic_table_entry.cpp b/src/storage/ic_table_entry.cpp new file mode 100644 index 0000000..7ce387c --- /dev/null +++ b/src/storage/ic_table_entry.cpp @@ -0,0 +1,146 @@ +#include "storage/ic_catalog.hpp" +#include "storage/ic_schema_entry.hpp" +#include "storage/ic_table_entry.hpp" +#include "storage/ic_transaction.hpp" +#include "duckdb/storage/statistics/base_statistics.hpp" +#include "duckdb/storage/table_storage_info.hpp" +#include "duckdb/main/extension_util.hpp" +#include "duckdb/main/database.hpp" +#include "duckdb/main/secret/secret_manager.hpp" +#include "duckdb/catalog/catalog_entry/table_function_catalog_entry.hpp" +#include "duckdb/parser/tableref/table_function_ref.hpp" +#include "catalog_api.hpp" +#include "../../duckdb/third_party/catch/catch.hpp" +#include "duckdb/planner/binder.hpp" +#include "duckdb/planner/tableref/bound_table_function.hpp" +#include "duckdb/planner/logical_operator.hpp" +#include "duckdb/planner/operator/logical_get.hpp" + + +namespace duckdb { + +ICTableEntry::ICTableEntry(Catalog &catalog, SchemaCatalogEntry &schema, CreateTableInfo &info) + : TableCatalogEntry(catalog, schema, info) { + this->internal = false; +} + +ICTableEntry::ICTableEntry(Catalog &catalog, SchemaCatalogEntry &schema, ICTableInfo &info) + : TableCatalogEntry(catalog, schema, *info.create_info) { + this->internal = false; +} + +unique_ptr ICTableEntry::GetStatistics(ClientContext &context, column_t column_id) { + return nullptr; +} + +void ICTableEntry::BindUpdateConstraints(Binder &binder, LogicalGet &, LogicalProjection &, LogicalUpdate &, + ClientContext &) { + throw NotImplementedException("BindUpdateConstraints"); +} + +struct MyIcebergFunctionData : public FunctionData { + std::string path; // store the path or any other relevant info here + + // Optional: implement Copy for caching/pushdown logic + unique_ptr Copy() const override { + auto copy = make_uniq(); + copy->path = path; + return copy; + } + + // Optional: implement Equals for caching + bool Equals(const FunctionData &other_p) const override { + auto &other = (const MyIcebergFunctionData &)other_p; + return path == other.path; + } +}; + +TableFunction ICTableEntry::GetScanFunction(ClientContext &context, unique_ptr &bind_data) { + auto &db = DatabaseInstance::GetDatabase(context); + auto &ic_catalog = catalog.Cast(); + + auto &parquet_function_set = ExtensionUtil::GetTableFunction(db, "parquet_scan"); + auto parquet_scan_function = parquet_function_set.functions.GetFunctionByArguments(context, {LogicalType::VARCHAR}); + + auto &iceberg_function_set = ExtensionUtil::GetTableFunction(db, "iceberg_scan"); + auto iceberg_scan_function = iceberg_function_set.functions.GetFunctionByArguments(context, {LogicalType::VARCHAR}); + + D_ASSERT(table_data); + + if (table_data->data_source_format != "ICEBERG") { + throw NotImplementedException("Table '%s' is of unsupported format '%s', ", table_data->name, + table_data->data_source_format); + } + + if (table_data->metadata_location.find("file://") != 0) { + auto &secret_manager = SecretManager::Get(context); + // Get Credentials from ICAPI + auto table_credentials = ICAPI::GetTableCredentials( + ic_catalog.internal_name, table_data->schema_name, table_data->name, ic_catalog.credentials); + // Inject secret into secret manager scoped to this path + CreateSecretInfo info(OnCreateConflict::REPLACE_ON_CONFLICT, SecretPersistType::TEMPORARY); + info.name = "__internal_ic_" + table_data->table_id; + info.type = "s3"; + info.provider = "config"; + info.storage_type = "memory"; + info.options = { + {"key_id", table_credentials.key_id}, + {"secret", table_credentials.secret}, + {"session_token", table_credentials.session_token}, + {"region", ic_catalog.credentials.aws_region}, + }; + + std::string lc_storage_location; + lc_storage_location.resize(table_data->metadata_location.size()); + std::transform(table_data->metadata_location.begin(), table_data->metadata_location.end(), lc_storage_location.begin(), ::tolower); + size_t metadata_pos = lc_storage_location.find("metadata"); + if (metadata_pos != std::string::npos) { + info.scope = {lc_storage_location.substr(0, metadata_pos)}; + } else { + throw std::runtime_error("Substring not found"); + } + + auto my_secret = secret_manager.CreateSecret(context, info); + } + + named_parameter_map_t param_map; + vector return_types; + vector names; + TableFunctionRef empty_ref; + + // Set the S3 path as input to table function + vector inputs = {table_data->metadata_location}; + + TableFunctionBindInput bind_input(inputs, param_map, return_types, names, nullptr, nullptr, + iceberg_scan_function, empty_ref); + + auto table_ref = iceberg_scan_function.bind_replace(context, bind_input); + + // 1) Create a Binder and bind the parser-level TableRef -> BoundTableRef + auto binder = Binder::CreateBinder(context); + auto bound_ref = binder->Bind(*table_ref); + + // 2) Create a logical plan from the bound reference + unique_ptr logical_plan = binder->CreatePlan(*bound_ref); + + // 3) Recursively search the logical plan for a LogicalGet node + // For a single table function, you often have just one operator: LogicalGet + LogicalOperator *op = logical_plan.get(); + if (op->type != LogicalOperatorType::LOGICAL_GET) { + throw std::runtime_error("Expected a LogicalGet, but got something else!"); + } + + // 4) Access the bind_data inside LogicalGet + auto &get = (LogicalGet &)*op; + bind_data = std::move(get.bind_data); + + return parquet_scan_function; +} + +TableStorageInfo ICTableEntry::GetStorageInfo(ClientContext &context) { + TableStorageInfo result; + // TODO fill info + return result; +} + +} // namespace duckdb diff --git a/src/storage/ic_table_set.cpp b/src/storage/ic_table_set.cpp new file mode 100644 index 0000000..2dca47e --- /dev/null +++ b/src/storage/ic_table_set.cpp @@ -0,0 +1,123 @@ +#include "catalog_api.hpp" +#include "catalog_utils.hpp" + +#include "storage/ic_catalog.hpp" +#include "storage/ic_table_set.hpp" +#include "storage/ic_transaction.hpp" +#include "duckdb/parser/parsed_data/create_table_info.hpp" +#include "duckdb/parser/constraints/not_null_constraint.hpp" +#include "duckdb/parser/constraints/unique_constraint.hpp" +#include "duckdb/parser/expression/constant_expression.hpp" +#include "duckdb/planner/parsed_data/bound_create_table_info.hpp" +#include "duckdb/parser/parsed_data/drop_info.hpp" +#include "duckdb/catalog/dependency_list.hpp" +#include "duckdb/parser/parsed_data/create_table_info.hpp" +#include "duckdb/parser/constraints/list.hpp" +#include "storage/ic_schema_entry.hpp" +#include "duckdb/parser/parser.hpp" + +namespace duckdb { + +ICTableSet::ICTableSet(ICSchemaEntry &schema) : ICInSchemaSet(schema) { +} + +static ColumnDefinition CreateColumnDefinition(ICAPIColumnDefinition &coldef) { + return {coldef.name, ICUtils::TypeToLogicalType(coldef.type_text)}; +} + +unique_ptr ICTableSet::CreateTableEntry(Catalog &catalog, SchemaCatalogEntry &schema, ICAPITable table) { + D_ASSERT(schema.name == table.schema_name); + CreateTableInfo info; + info.table = table.name; + for (auto &col : table.columns) { + info.columns.AddColumn(CreateColumnDefinition(col)); + } + + auto table_entry = make_uniq(catalog, schema, info); + table_entry->table_data = make_uniq(table); + return table_entry; +} + +void ICTableSet::FillEntry(ClientContext &context, unique_ptr &entry) { + auto* derived = static_cast(entry.get()); + if (!derived->table_data->metadata_location.empty()) { + return; + } + + auto &ic_catalog = catalog.Cast(); + auto table = ICAPI::GetTable(catalog.GetName(), catalog.GetDBPath(), schema.name, entry->name, ic_catalog.credentials); + entry = CreateTableEntry(catalog, schema, table); +} + +void ICTableSet::LoadEntries(ClientContext &context) { + if (!entries.empty()) { + return; + } + + auto &ic_catalog = catalog.Cast(); + // TODO: handle out-of-order columns using position property + auto tables = ICAPI::GetTables(catalog.GetName(), catalog.GetDBPath(), schema.name, ic_catalog.credentials); + + for (auto &table : tables) { + auto entry = CreateTableEntry(catalog, schema, table); + AddEntry(std::move(entry)); + } +} + +optional_ptr ICTableSet::RefreshTable(ClientContext &context, const string &table_name) { + auto table_info = GetTableInfo(context, schema, table_name); + auto table_entry = make_uniq(catalog, schema, *table_info); + auto table_ptr = table_entry.get(); + AddEntry(std::move(table_entry)); + return table_ptr; +} + +unique_ptr ICTableSet::GetTableInfo(ClientContext &context, ICSchemaEntry &schema, + const string &table_name) { + throw NotImplementedException("ICTableSet::GetTableInfo"); +} + +optional_ptr ICTableSet::CreateTable(ClientContext &context, BoundCreateTableInfo &info) { + auto &ic_catalog = catalog.Cast(); + auto *table_info = dynamic_cast(info.base.get()); + auto table = ICAPI::CreateTable(catalog.GetName(), ic_catalog.internal_name, schema.name, ic_catalog.credentials, table_info); + auto entry = CreateTableEntry(catalog, schema, table); + return AddEntry(std::move(entry)); +} + +void ICTableSet::DropTable(ClientContext &context, DropInfo &info) { + auto &ic_catalog = catalog.Cast(); + ICAPI::DropTable(catalog.GetName(), ic_catalog.internal_name, schema.name, info.name, ic_catalog.credentials); +} + +void ICTableSet::AlterTable(ClientContext &context, RenameTableInfo &info) { + throw NotImplementedException("ICTableSet::AlterTable"); +} + +void ICTableSet::AlterTable(ClientContext &context, RenameColumnInfo &info) { + throw NotImplementedException("ICTableSet::AlterTable"); +} + +void ICTableSet::AlterTable(ClientContext &context, AddColumnInfo &info) { + throw NotImplementedException("ICTableSet::AlterTable"); +} + +void ICTableSet::AlterTable(ClientContext &context, RemoveColumnInfo &info) { + throw NotImplementedException("ICTableSet::AlterTable"); +} + +void ICTableSet::AlterTable(ClientContext &context, AlterTableInfo &alter) { + throw NotImplementedException("ICTableSet::AlterTable"); +} + +ICInSchemaSet::ICInSchemaSet(ICSchemaEntry &schema) : ICCatalogSet(schema.ParentCatalog()), schema(schema) { +} + +optional_ptr ICInSchemaSet::AddEntry(unique_ptr entry) { + if (!entry->internal) { + entry->internal = schema.internal; + } + return ICCatalogSet::AddEntry(std::move(entry)); +} + +} // namespace duckdb diff --git a/src/storage/ic_transaction.cpp b/src/storage/ic_transaction.cpp new file mode 100644 index 0000000..70c29c7 --- /dev/null +++ b/src/storage/ic_transaction.cpp @@ -0,0 +1,61 @@ +#include "storage/ic_transaction.hpp" +#include "storage/ic_catalog.hpp" +#include "duckdb/parser/parsed_data/create_view_info.hpp" +#include "duckdb/catalog/catalog_entry/index_catalog_entry.hpp" +#include "duckdb/catalog/catalog_entry/view_catalog_entry.hpp" + +namespace duckdb { + +ICTransaction::ICTransaction(ICCatalog &ic_catalog, TransactionManager &manager, ClientContext &context) + : Transaction(manager, context), access_mode(ic_catalog.access_mode) { + // connection = ICConnection::Open(ic_catalog.path); +} + +ICTransaction::~ICTransaction() = default; + +void ICTransaction::Start() { + transaction_state = ICTransactionState::TRANSACTION_NOT_YET_STARTED; +} +void ICTransaction::Commit() { + if (transaction_state == ICTransactionState::TRANSACTION_STARTED) { + transaction_state = ICTransactionState::TRANSACTION_FINISHED; + // connection.Execute("COMMIT"); + } +} +void ICTransaction::Rollback() { + if (transaction_state == ICTransactionState::TRANSACTION_STARTED) { + transaction_state = ICTransactionState::TRANSACTION_FINISHED; + // connection.Execute("ROLLBACK"); + } +} + +// ICConnection &ICTransaction::GetConnection() { +// if (transaction_state == ICTransactionState::TRANSACTION_NOT_YET_STARTED) { +// transaction_state = ICTransactionState::TRANSACTION_STARTED; +// string query = "START TRANSACTION"; +// if (access_mode == AccessMode::READ_ONLY) { +// query += " READ ONLY"; +// } +//// conne/**/ction.Execute(query); +// } +// return connection; +//} + +// unique_ptr ICTransaction::Query(const string &query) { +// if (transaction_state == ICTransactionState::TRANSACTION_NOT_YET_STARTED) { +// transaction_state = ICTransactionState::TRANSACTION_STARTED; +// string transaction_start = "START TRANSACTION"; +// if (access_mode == AccessMode::READ_ONLY) { +// transaction_start += " READ ONLY"; +// } +// connection.Query(transaction_start); +// return connection.Query(query); +// } +// return connection.Query(query); +//} + +ICTransaction &ICTransaction::Get(ClientContext &context, Catalog &catalog) { + return Transaction::Get(context, catalog).Cast(); +} + +} // namespace duckdb diff --git a/src/storage/ic_transaction_manager.cpp b/src/storage/ic_transaction_manager.cpp new file mode 100644 index 0000000..f034de9 --- /dev/null +++ b/src/storage/ic_transaction_manager.cpp @@ -0,0 +1,40 @@ +#include "storage/ic_transaction_manager.hpp" +#include "duckdb/main/attached_database.hpp" + +namespace duckdb { + +ICTransactionManager::ICTransactionManager(AttachedDatabase &db_p, ICCatalog &ic_catalog) + : TransactionManager(db_p), ic_catalog(ic_catalog) { +} + +Transaction &ICTransactionManager::StartTransaction(ClientContext &context) { + auto transaction = make_uniq(ic_catalog, *this, context); + transaction->Start(); + auto &result = *transaction; + lock_guard l(transaction_lock); + transactions[result] = std::move(transaction); + return result; +} + +ErrorData ICTransactionManager::CommitTransaction(ClientContext &context, Transaction &transaction) { + auto &ic_transaction = transaction.Cast(); + ic_transaction.Commit(); + lock_guard l(transaction_lock); + transactions.erase(transaction); + return ErrorData(); +} + +void ICTransactionManager::RollbackTransaction(Transaction &transaction) { + auto &ic_transaction = transaction.Cast(); + ic_transaction.Rollback(); + lock_guard l(transaction_lock); + transactions.erase(transaction); +} + +void ICTransactionManager::Checkpoint(ClientContext &context, bool force) { + auto &transaction = ICTransaction::Get(context, db.GetCatalog()); + // auto &db = transaction.GetConnection(); + // db.Execute("CHECKPOINT"); +} + +} // namespace duckdb diff --git a/vcpkg.json b/vcpkg.json index 94ab7f7..17661df 100644 --- a/vcpkg.json +++ b/vcpkg.json @@ -4,6 +4,7 @@ "name": "avro-cpp", "features": ["snappy"] }, + "curl", "openssl" ], "vcpkg-configuration": {