Skip to content

Commit

Permalink
Merge branch 'main' into hms
Browse files Browse the repository at this point in the history
  • Loading branch information
marvinlanhenke committed Mar 19, 2024
2 parents 2ce1a6a + 6f85456 commit c04f3cf
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 58 deletions.
16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
[workspace]
resolver = "2"
members = [
"crates/catalog/*",
"crates/examples",
"crates/iceberg",
"crates/test_utils",
"crates/catalog/*",
"crates/examples",
"crates/iceberg",
"crates/test_utils",
]

[workspace.package]
Expand All @@ -37,9 +37,9 @@ rust-version = "1.75.0"
anyhow = "1.0.72"
apache-avro = "0.16"
array-init = "2"
arrow-arith = { version = ">=46" }
arrow-array = { version = ">=46" }
arrow-schema = { version = ">=46" }
arrow-arith = { version = "51" }
arrow-array = { version = "51" }
arrow-schema = { version = "51" }
async-stream = "0.3.5"
async-trait = "0.1"
bimap = "0.6"
Expand All @@ -61,7 +61,7 @@ murmur3 = "0.5.2"
once_cell = "1"
opendal = "0.45"
ordered-float = "4.0.0"
parquet = "50"
parquet = "51"
pilota = "0.10.0"
pretty_assertions = "1.4.0"
port_scanner = "0.1.5"
Expand Down
118 changes: 113 additions & 5 deletions crates/catalog/rest/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use iceberg::{

use self::_serde::{
CatalogConfig, ErrorResponse, ListNamespaceResponse, ListTableResponse, NamespaceSerde,
RenameTableRequest, NO_CONTENT, OK,
RenameTableRequest, TokenResponse, NO_CONTENT, OK,
};

const ICEBERG_REST_SPEC_VERSION: &str = "0.14.1";
Expand Down Expand Up @@ -96,9 +96,13 @@ impl RestCatalogConfig {
.join("/")
}

fn get_token_endpoint(&self) -> String {
[&self.uri, PATH_V1, "oauth", "tokens"].join("/")
}

fn try_create_rest_client(&self) -> Result<HttpClient> {
//TODO: We will add oauth, ssl config, sigv4 later
let headers = HeaderMap::from_iter([
// TODO: We will add ssl config, sigv4 later
let mut headers = HeaderMap::from_iter([
(
header::CONTENT_TYPE,
HeaderValue::from_static("application/json"),
Expand All @@ -113,6 +117,19 @@ impl RestCatalogConfig {
),
]);

if let Some(token) = self.props.get("token") {
headers.insert(
header::AUTHORIZATION,
HeaderValue::from_str(&format!("Bearer {token}")).map_err(|e| {
Error::new(
ErrorKind::DataInvalid,
"Invalid token received from catalog server!",
)
.with_source(e)
})?,
);
}

Ok(HttpClient(
Client::builder().default_headers(headers).build()?,
))
Expand Down Expand Up @@ -144,13 +161,15 @@ impl HttpClient {
.with_source(e)
})?)
} else {
let code = resp.status();
let text = resp.bytes().await?;
let e = serde_json::from_slice::<E>(&text).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
"Failed to parse response from rest catalog server!",
)
.with_context("json", String::from_utf8_lossy(&text))
.with_context("code", code.to_string())
.with_source(e)
})?;
Err(e.into())
Expand Down Expand Up @@ -497,13 +516,56 @@ impl RestCatalog {
client: config.try_create_rest_client()?,
config,
};

catalog.fetch_access_token().await?;
catalog.client = catalog.config.try_create_rest_client()?;
catalog.update_config().await?;
catalog.client = catalog.config.try_create_rest_client()?;

Ok(catalog)
}

async fn fetch_access_token(&mut self) -> Result<()> {
if self.config.props.contains_key("token") {
return Ok(());
}
if let Some(credential) = self.config.props.get("credential") {
let (client_id, client_secret) = if credential.contains(':') {
let (client_id, client_secret) = credential.split_once(':').unwrap();
(Some(client_id), client_secret)
} else {
(None, credential.as_str())
};
let mut params = HashMap::with_capacity(4);
params.insert("grant_type", "client_credentials");
if let Some(client_id) = client_id {
params.insert("client_id", client_id);
}
params.insert("client_secret", client_secret);
params.insert("scope", "catalog");
let req = self
.client
.0
.post(self.config.get_token_endpoint())
.form(&params)
.build()?;
let res = self
.client
.query::<TokenResponse, ErrorResponse, OK>(req)
.await
.map_err(|e| {
Error::new(
ErrorKind::Unexpected,
"Failed to fetch access token from catalog server!",
)
.with_source(e)
})?;
let token = res.access_token;
self.config.props.insert("token".to_string(), token);
}

Ok(())
}

async fn update_config(&mut self) -> Result<()> {
let mut request = self.client.0.get(self.config.config_endpoint());

Expand Down Expand Up @@ -626,6 +688,14 @@ mod _serde {
}
}

#[derive(Debug, Serialize, Deserialize)]
pub(super) struct TokenResponse {
pub(super) access_token: String,
pub(super) token_type: String,
pub(super) expires_in: Option<u64>,
pub(super) issued_token_type: Option<String>,
}

#[derive(Debug, Serialize, Deserialize)]
pub(super) struct NamespaceSerde {
pub(super) namespace: Vec<String>,
Expand Down Expand Up @@ -778,6 +848,44 @@ mod tests {
.await
}

async fn create_oauth_mock(server: &mut ServerGuard) -> Mock {
server
.mock("POST", "/v1/oauth/tokens")
.with_status(200)
.with_body(
r#"{
"access_token": "ey000000000000",
"token_type": "Bearer",
"issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
"expires_in": 86400
}"#,
)
.create_async()
.await
}

#[tokio::test]
async fn test_oauth() {
let mut server = Server::new_async().await;
let oauth_mock = create_oauth_mock(&mut server).await;
let config_mock = create_config_mock(&mut server).await;

let mut props = HashMap::new();
props.insert("credential".to_string(), "client1:secret1".to_string());

let _catalog = RestCatalog::new(
RestCatalogConfig::builder()
.uri(server.url())
.props(props)
.build(),
)
.await
.unwrap();

oauth_mock.assert_async().await;
config_mock.assert_async().await;
}

#[tokio::test]
async fn test_list_namespace() {
let mut server = Server::new_async().await;
Expand Down Expand Up @@ -1557,7 +1665,7 @@ mod tests {
"type": "NoSuchTableException",
"code": 404
}
}
}
"#,
)
.create_async()
Expand Down
1 change: 1 addition & 0 deletions crates/catalog/rest/tests/rest_catalog_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ async fn set_test_fixture(func: &str) -> TestFixture {
rest_catalog,
}
}

#[tokio::test]
async fn test_get_non_exist_namespace() {
let fixture = set_test_fixture("test_get_non_exist_namespace").await;
Expand Down
8 changes: 4 additions & 4 deletions crates/iceberg/src/spec/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,15 +193,15 @@ pub enum PrimitiveType {
},
/// Calendar date without timezone or time.
Date,
/// Time of day without date or timezone.
/// Time of day in microsecond precision, without date or timezone.
Time,
/// Timestamp without timezone
/// Timestamp in microsecond precision, without timezone
Timestamp,
/// Timestamp with timezone
/// Timestamp in microsecond precision, with timezone
Timestamptz,
/// Arbitrary-length character sequences encoded in utf-8
String,
/// Universally Unique Identifiers
/// Universally Unique Identifiers, should use 16-byte fixed
Uuid,
/// Fixed length byte array
Fixed(u64),
Expand Down
12 changes: 6 additions & 6 deletions crates/iceberg/src/spec/table_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,9 @@ impl TableMetadataBuilder {
))
}
None => HashMap::from([(
0,
DEFAULT_SPEC_ID,
Arc::new(PartitionSpec {
spec_id: 0,
spec_id: DEFAULT_SPEC_ID,
fields: vec![],
}),
)]),
Expand All @@ -322,9 +322,9 @@ impl TableMetadataBuilder {
))
}
None => HashMap::from([(
0,
DEFAULT_SORT_ORDER_ID,
Arc::new(SortOrder {
order_id: 0,
order_id: DEFAULT_SORT_ORDER_ID,
fields: vec![],
}),
)]),
Expand All @@ -345,15 +345,15 @@ impl TableMetadataBuilder {
current_schema_id: schema.schema_id(),
schemas: HashMap::from([(schema.schema_id(), Arc::new(schema))]),
partition_specs,
default_spec_id: 0,
default_spec_id: DEFAULT_SPEC_ID,
last_partition_id: 0,
properties,
current_snapshot_id: None,
snapshots: Default::default(),
snapshot_log: vec![],
sort_orders,
metadata_log: vec![],
default_sort_order_id: 0,
default_sort_order_id: DEFAULT_SORT_ORDER_ID,
refs: Default::default(),
};

Expand Down
18 changes: 8 additions & 10 deletions crates/iceberg/src/transform/temporal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@

use super::TransformFunction;
use crate::{Error, ErrorKind, Result};
use arrow_arith::{
arity::binary,
temporal::{month_dyn, year_dyn},
};
use arrow_arith::temporal::DatePart;
use arrow_arith::{arity::binary, temporal::date_part};
use arrow_array::{
types::Date32Type, Array, ArrayRef, Date32Array, Int32Array, TimestampMicrosecondArray,
};
Expand All @@ -43,8 +41,8 @@ pub struct Year;

impl TransformFunction for Year {
fn transform(&self, input: ArrayRef) -> Result<ArrayRef> {
let array =
year_dyn(&input).map_err(|err| Error::new(ErrorKind::Unexpected, format!("{err}")))?;
let array = date_part(&input, DatePart::Year)
.map_err(|err| Error::new(ErrorKind::Unexpected, format!("{err}")))?;
Ok(Arc::<Int32Array>::new(
array
.as_any()
Expand All @@ -61,15 +59,15 @@ pub struct Month;

impl TransformFunction for Month {
fn transform(&self, input: ArrayRef) -> Result<ArrayRef> {
let year_array =
year_dyn(&input).map_err(|err| Error::new(ErrorKind::Unexpected, format!("{err}")))?;
let year_array = date_part(&input, DatePart::Year)
.map_err(|err| Error::new(ErrorKind::Unexpected, format!("{err}")))?;
let year_array: Int32Array = year_array
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.unary(|v| 12 * (v - UNIX_EPOCH_YEAR));
let month_array =
month_dyn(&input).map_err(|err| Error::new(ErrorKind::Unexpected, format!("{err}")))?;
let month_array = date_part(&input, DatePart::Month)
.map_err(|err| Error::new(ErrorKind::Unexpected, format!("{err}")))?;
Ok(Arc::<Int32Array>::new(
binary(
month_array.as_any().downcast_ref::<Int32Array>().unwrap(),
Expand Down
Loading

0 comments on commit c04f3cf

Please sign in to comment.