From d6703df40b24477d0a5a36939746bb1b36cc6933 Mon Sep 17 00:00:00 2001 From: TennyZhuang Date: Mon, 18 Mar 2024 11:32:53 +0800 Subject: [PATCH 1/4] feat: implement OAuth for catalog rest client (#254) --- crates/catalog/rest/src/catalog.rs | 118 +++++++++++++++++- .../catalog/rest/tests/rest_catalog_test.rs | 1 + 2 files changed, 114 insertions(+), 5 deletions(-) diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 812ac828a..ae9ae105f 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -38,7 +38,7 @@ use iceberg::{ use self::_serde::{ CatalogConfig, ErrorResponse, ListNamespaceResponse, ListTableResponse, NamespaceSerde, - RenameTableRequest, NO_CONTENT, OK, + RenameTableRequest, TokenResponse, NO_CONTENT, OK, }; const ICEBERG_REST_SPEC_VERSION: &str = "0.14.1"; @@ -96,9 +96,13 @@ impl RestCatalogConfig { .join("/") } + fn get_token_endpoint(&self) -> String { + [&self.uri, PATH_V1, "oauth", "tokens"].join("/") + } + fn try_create_rest_client(&self) -> Result { - //TODO: We will add oauth, ssl config, sigv4 later - let headers = HeaderMap::from_iter([ + // TODO: We will add ssl config, sigv4 later + let mut headers = HeaderMap::from_iter([ ( header::CONTENT_TYPE, HeaderValue::from_static("application/json"), @@ -113,6 +117,19 @@ impl RestCatalogConfig { ), ]); + if let Some(token) = self.props.get("token") { + headers.insert( + header::AUTHORIZATION, + HeaderValue::from_str(&format!("Bearer {token}")).map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + "Invalid token received from catalog server!", + ) + .with_source(e) + })?, + ); + } + Ok(HttpClient( Client::builder().default_headers(headers).build()?, )) @@ -144,6 +161,7 @@ impl HttpClient { .with_source(e) })?) } else { + let code = resp.status(); let text = resp.bytes().await?; let e = serde_json::from_slice::(&text).map_err(|e| { Error::new( @@ -151,6 +169,7 @@ impl HttpClient { "Failed to parse response from rest catalog server!", ) .with_context("json", String::from_utf8_lossy(&text)) + .with_context("code", code.to_string()) .with_source(e) })?; Err(e.into()) @@ -497,13 +516,56 @@ impl RestCatalog { client: config.try_create_rest_client()?, config, }; - + catalog.fetch_access_token().await?; + catalog.client = catalog.config.try_create_rest_client()?; catalog.update_config().await?; catalog.client = catalog.config.try_create_rest_client()?; Ok(catalog) } + async fn fetch_access_token(&mut self) -> Result<()> { + if self.config.props.contains_key("token") { + return Ok(()); + } + if let Some(credential) = self.config.props.get("credential") { + let (client_id, client_secret) = if credential.contains(':') { + let (client_id, client_secret) = credential.split_once(':').unwrap(); + (Some(client_id), client_secret) + } else { + (None, credential.as_str()) + }; + let mut params = HashMap::with_capacity(4); + params.insert("grant_type", "client_credentials"); + if let Some(client_id) = client_id { + params.insert("client_id", client_id); + } + params.insert("client_secret", client_secret); + params.insert("scope", "catalog"); + let req = self + .client + .0 + .post(self.config.get_token_endpoint()) + .form(¶ms) + .build()?; + let res = self + .client + .query::(req) + .await + .map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "Failed to fetch access token from catalog server!", + ) + .with_source(e) + })?; + let token = res.access_token; + self.config.props.insert("token".to_string(), token); + } + + Ok(()) + } + async fn update_config(&mut self) -> Result<()> { let mut request = self.client.0.get(self.config.config_endpoint()); @@ -626,6 +688,14 @@ mod _serde { } } + #[derive(Debug, Serialize, Deserialize)] + pub(super) struct TokenResponse { + pub(super) access_token: String, + pub(super) token_type: String, + pub(super) expires_in: Option, + pub(super) issued_token_type: Option, + } + #[derive(Debug, Serialize, Deserialize)] pub(super) struct NamespaceSerde { pub(super) namespace: Vec, @@ -778,6 +848,44 @@ mod tests { .await } + async fn create_oauth_mock(server: &mut ServerGuard) -> Mock { + server + .mock("POST", "/v1/oauth/tokens") + .with_status(200) + .with_body( + r#"{ + "access_token": "ey000000000000", + "token_type": "Bearer", + "issued_token_type": "urn:ietf:params:oauth:token-type:access_token", + "expires_in": 86400 + }"#, + ) + .create_async() + .await + } + + #[tokio::test] + async fn test_oauth() { + let mut server = Server::new_async().await; + let oauth_mock = create_oauth_mock(&mut server).await; + let config_mock = create_config_mock(&mut server).await; + + let mut props = HashMap::new(); + props.insert("credential".to_string(), "client1:secret1".to_string()); + + let _catalog = RestCatalog::new( + RestCatalogConfig::builder() + .uri(server.url()) + .props(props) + .build(), + ) + .await + .unwrap(); + + oauth_mock.assert_async().await; + config_mock.assert_async().await; + } + #[tokio::test] async fn test_list_namespace() { let mut server = Server::new_async().await; @@ -1557,7 +1665,7 @@ mod tests { "type": "NoSuchTableException", "code": 404 } -} +} "#, ) .create_async() diff --git a/crates/catalog/rest/tests/rest_catalog_test.rs b/crates/catalog/rest/tests/rest_catalog_test.rs index a4d07955b..205428d61 100644 --- a/crates/catalog/rest/tests/rest_catalog_test.rs +++ b/crates/catalog/rest/tests/rest_catalog_test.rs @@ -66,6 +66,7 @@ async fn set_test_fixture(func: &str) -> TestFixture { rest_catalog, } } + #[tokio::test] async fn test_get_non_exist_namespace() { let fixture = set_test_fixture("test_get_non_exist_namespace").await; From 124a79be88ff26c3ea35faf7719c912283cadce7 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 19 Mar 2024 11:47:21 +0800 Subject: [PATCH 2/4] docs: annotate precision and length to primitive types (#270) Signed-off-by: Ruihang Xia --- crates/iceberg/src/spec/datatypes.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/iceberg/src/spec/datatypes.rs b/crates/iceberg/src/spec/datatypes.rs index 6ea4175e5..494d2e8a3 100644 --- a/crates/iceberg/src/spec/datatypes.rs +++ b/crates/iceberg/src/spec/datatypes.rs @@ -193,15 +193,15 @@ pub enum PrimitiveType { }, /// Calendar date without timezone or time. Date, - /// Time of day without date or timezone. + /// Time of day in microsecond precision, without date or timezone. Time, - /// Timestamp without timezone + /// Timestamp in microsecond precision, without timezone Timestamp, - /// Timestamp with timezone + /// Timestamp in microsecond precision, with timezone Timestamptz, /// Arbitrary-length character sequences encoded in utf-8 String, - /// Universally Unique Identifiers + /// Universally Unique Identifiers, should use 16-byte fixed Uuid, /// Fixed length byte array Fixed(u64), From 76af8898028aa69ed4abc749c1af4b74776b910c Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 18 Mar 2024 22:22:27 -0700 Subject: [PATCH 3/4] build: Restore CI by making parquet and arrow version consistent (#280) --- Cargo.toml | 16 ++++----- crates/iceberg/src/transform/temporal.rs | 18 +++++----- .../src/writer/file_writer/parquet_writer.rs | 33 +++++-------------- 3 files changed, 24 insertions(+), 43 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 809fc4f9d..7da16e00d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,10 +18,10 @@ [workspace] resolver = "2" members = [ - "crates/catalog/*", - "crates/examples", - "crates/iceberg", - "crates/test_utils", + "crates/catalog/*", + "crates/examples", + "crates/iceberg", + "crates/test_utils", ] [workspace.package] @@ -37,9 +37,9 @@ rust-version = "1.75.0" anyhow = "1.0.72" apache-avro = "0.16" array-init = "2" -arrow-arith = { version = ">=46" } -arrow-array = { version = ">=46" } -arrow-schema = { version = ">=46" } +arrow-arith = { version = "51" } +arrow-array = { version = "51" } +arrow-schema = { version = "51" } async-stream = "0.3.5" async-trait = "0.1" bimap = "0.6" @@ -61,7 +61,7 @@ murmur3 = "0.5.2" once_cell = "1" opendal = "0.45" ordered-float = "4.0.0" -parquet = "50" +parquet = "51" pilota = "0.10.0" pretty_assertions = "1.4.0" port_scanner = "0.1.5" diff --git a/crates/iceberg/src/transform/temporal.rs b/crates/iceberg/src/transform/temporal.rs index 7b8deb17d..4556543ae 100644 --- a/crates/iceberg/src/transform/temporal.rs +++ b/crates/iceberg/src/transform/temporal.rs @@ -17,10 +17,8 @@ use super::TransformFunction; use crate::{Error, ErrorKind, Result}; -use arrow_arith::{ - arity::binary, - temporal::{month_dyn, year_dyn}, -}; +use arrow_arith::temporal::DatePart; +use arrow_arith::{arity::binary, temporal::date_part}; use arrow_array::{ types::Date32Type, Array, ArrayRef, Date32Array, Int32Array, TimestampMicrosecondArray, }; @@ -43,8 +41,8 @@ pub struct Year; impl TransformFunction for Year { fn transform(&self, input: ArrayRef) -> Result { - let array = - year_dyn(&input).map_err(|err| Error::new(ErrorKind::Unexpected, format!("{err}")))?; + let array = date_part(&input, DatePart::Year) + .map_err(|err| Error::new(ErrorKind::Unexpected, format!("{err}")))?; Ok(Arc::::new( array .as_any() @@ -61,15 +59,15 @@ pub struct Month; impl TransformFunction for Month { fn transform(&self, input: ArrayRef) -> Result { - let year_array = - year_dyn(&input).map_err(|err| Error::new(ErrorKind::Unexpected, format!("{err}")))?; + let year_array = date_part(&input, DatePart::Year) + .map_err(|err| Error::new(ErrorKind::Unexpected, format!("{err}")))?; let year_array: Int32Array = year_array .as_any() .downcast_ref::() .unwrap() .unary(|v| 12 * (v - UNIX_EPOCH_YEAR)); - let month_array = - month_dyn(&input).map_err(|err| Error::new(ErrorKind::Unexpected, format!("{err}")))?; + let month_array = date_part(&input, DatePart::Month) + .map_err(|err| Error::new(ErrorKind::Unexpected, format!("{err}")))?; Ok(Arc::::new( binary( month_array.as_any().downcast_ref::().unwrap(), diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index bb4550fab..3ec1a1b14 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -18,7 +18,6 @@ //! The module contains the file writer for parquet file format. use std::{ - cmp::max, collections::HashMap, sync::{atomic::AtomicI64, Arc}, }; @@ -43,9 +42,6 @@ use super::{ /// ParquetWriterBuilder is used to builder a [`ParquetWriter`] #[derive(Clone)] pub struct ParquetWriterBuilder { - /// `buffer_size` determines the initial size of the intermediate buffer. - /// The intermediate buffer will automatically be resized if necessary - init_buffer_size: usize, props: WriterProperties, schema: ArrowSchemaRef, @@ -55,13 +51,9 @@ pub struct ParquetWriterBuilder { } impl ParquetWriterBuilder { - /// To avoid EntiryTooSmall error, we set the minimum buffer size to 8MB if the given buffer size is smaller than it. - const MIN_BUFFER_SIZE: usize = 8 * 1024 * 1024; - /// Create a new `ParquetWriterBuilder` /// To construct the write result, the schema should contain the `PARQUET_FIELD_ID_META_KEY` metadata for each field. pub fn new( - init_buffer_size: usize, props: WriterProperties, schema: ArrowSchemaRef, file_io: FileIO, @@ -69,7 +61,6 @@ impl ParquetWriterBuilder { file_name_generator: F, ) -> Self { Self { - init_buffer_size, props, schema, file_io, @@ -112,20 +103,14 @@ impl FileWriterBuilder for ParquetWr .generate_location(&self.file_name_generator.generate_file_name()), )?; let inner_writer = TrackWriter::new(out_file.writer().await?, written_size.clone()); - let init_buffer_size = max(Self::MIN_BUFFER_SIZE, self.init_buffer_size); - let writer = AsyncArrowWriter::try_new( - inner_writer, - self.schema.clone(), - init_buffer_size, - Some(self.props), - ) - .map_err(|err| { - Error::new( - crate::ErrorKind::Unexpected, - "Failed to build parquet writer.", - ) - .with_source(err) - })?; + let writer = AsyncArrowWriter::try_new(inner_writer, self.schema.clone(), Some(self.props)) + .map_err(|err| { + Error::new( + crate::ErrorKind::Unexpected, + "Failed to build parquet writer.", + ) + .with_source(err) + })?; Ok(ParquetWriter { writer, @@ -311,7 +296,6 @@ mod tests { // write data let mut pw = ParquetWriterBuilder::new( - 0, WriterProperties::builder().build(), to_write.schema(), file_io.clone(), @@ -551,7 +535,6 @@ mod tests { // write data let mut pw = ParquetWriterBuilder::new( - 0, WriterProperties::builder().build(), to_write.schema(), file_io.clone(), From 6f8545618dbc666b7117870e08057960246d8812 Mon Sep 17 00:00:00 2001 From: Marvin Lanhenke <62298609+marvinlanhenke@users.noreply.github.com> Date: Tue, 19 Mar 2024 08:16:01 +0100 Subject: [PATCH 4/4] Metadata Serde + default partition_specs and sort_orders (#272) * change serde metadata v2 * change default partition_specs and sort_orders * change test * use DEFAULTS --- crates/iceberg/src/spec/table_metadata.rs | 81 ++++++++++++++++------- 1 file changed, 57 insertions(+), 24 deletions(-) diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 9893e9eea..4892c2623 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -29,6 +29,7 @@ use super::{ snapshot::{Snapshot, SnapshotReference, SnapshotRetention}, PartitionSpecRef, SchemaId, SchemaRef, SnapshotRef, SortOrderRef, }; +use super::{PartitionSpec, SortOrder}; use _serde::TableMetadataEnum; @@ -297,19 +298,37 @@ impl TableMetadataBuilder { properties, } = table_creation; - if partition_spec.is_some() { - return Err(Error::new( - ErrorKind::FeatureUnsupported, - "Can't create table with partition spec now", - )); - } + let partition_specs = match partition_spec { + Some(_) => { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Can't create table with partition spec now", + )) + } + None => HashMap::from([( + DEFAULT_SPEC_ID, + Arc::new(PartitionSpec { + spec_id: DEFAULT_SPEC_ID, + fields: vec![], + }), + )]), + }; - if sort_order.is_some() { - return Err(Error::new( - ErrorKind::FeatureUnsupported, - "Can't create table with sort order now", - )); - } + let sort_orders = match sort_order { + Some(_) => { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Can't create table with sort order now", + )) + } + None => HashMap::from([( + DEFAULT_SORT_ORDER_ID, + Arc::new(SortOrder { + order_id: DEFAULT_SORT_ORDER_ID, + fields: vec![], + }), + )]), + }; let table_metadata = TableMetadata { format_version: FormatVersion::V2, @@ -325,16 +344,16 @@ impl TableMetadataBuilder { last_column_id: schema.highest_field_id(), current_schema_id: schema.schema_id(), schemas: HashMap::from([(schema.schema_id(), Arc::new(schema))]), - partition_specs: Default::default(), - default_spec_id: 0, + partition_specs, + default_spec_id: DEFAULT_SPEC_ID, last_partition_id: 0, properties, current_snapshot_id: None, snapshots: Default::default(), snapshot_log: vec![], - sort_orders: Default::default(), + sort_orders, metadata_log: vec![], - default_sort_order_id: 0, + default_sort_order_id: DEFAULT_SORT_ORDER_ID, refs: Default::default(), }; @@ -727,14 +746,10 @@ pub(super) mod _serde { .collect(), default_spec_id: v.default_spec_id, last_partition_id: v.last_partition_id, - properties: if v.properties.is_empty() { - None - } else { - Some(v.properties) - }, + properties: Some(v.properties), current_snapshot_id: v.current_snapshot_id.or(Some(-1)), snapshots: if v.snapshots.is_empty() { - None + Some(vec![]) } else { Some( v.snapshots @@ -1675,9 +1690,27 @@ mod tests { .len(), 0 ); - assert_eq!(table_metadata.partition_specs.len(), 0); assert_eq!(table_metadata.properties.len(), 0); - assert_eq!(table_metadata.sort_orders.len(), 0); + assert_eq!( + table_metadata.partition_specs, + HashMap::from([( + 0, + Arc::new(PartitionSpec { + spec_id: 0, + fields: vec![] + }) + )]) + ); + assert_eq!( + table_metadata.sort_orders, + HashMap::from([( + 0, + Arc::new(SortOrder { + order_id: 0, + fields: vec![] + }) + )]) + ); } #[test]