Skip to content

Commit

Permalink
keep file_io in HmsCatalog
Browse files Browse the repository at this point in the history
  • Loading branch information
marvinlanhenke committed Mar 20, 2024
1 parent c765268 commit 98a2f27
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 57 deletions.
33 changes: 20 additions & 13 deletions crates/catalog/hms/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ pub enum HmsThriftTransport {
pub struct HmsCatalogConfig {
address: String,
thrift_transport: HmsThriftTransport,
#[builder(default, setter(strip_option))]
warehouse: Option<String>,
warehouse: String,
#[builder(default)]
props: HashMap<String, String>,
}
Expand All @@ -68,6 +67,7 @@ struct HmsClient(ThriftHiveMetastoreClient);
pub struct HmsCatalog {
config: HmsCatalogConfig,
client: HmsClient,
file_io: FileIO,
}

impl Debug for HmsCatalog {
Expand Down Expand Up @@ -105,11 +105,20 @@ impl HmsCatalog {
.build(),
};

let file_io = FileIO::from_path(&config.warehouse)?
.with_props(&config.props)
.build()?;

Ok(Self {
config,
client: HmsClient(client),
file_io,
})
}
/// Get the catalogs `FileIO`
pub fn file_io(&self) -> FileIO {
self.file_io.clone()
}
}

#[async_trait]
Expand Down Expand Up @@ -333,17 +342,18 @@ impl Catalog for HmsCatalog {
Some(location) => location.clone(),
None => {
let ns = self.get_namespace(namespace).await?;
get_default_table_location(&ns, &table_name, self.config.warehouse.clone())?
get_default_table_location(&ns, &table_name, &self.config.warehouse)
}
};

let metadata = TableMetadataBuilder::from_table_creation(creation)?.build()?;
let metadata_location = create_metadata_location(&location, 0)?;

let file_io = FileIO::from_path(&metadata_location)?
.with_props(&self.config.props)
.build()?;
let mut file = file_io.new_output(&metadata_location)?.writer().await?;
let mut file = self
.file_io
.new_output(&metadata_location)?
.writer()
.await?;
file.write_all(&serde_json::to_vec(&metadata)?).await?;
file.shutdown().await?;

Expand All @@ -363,7 +373,7 @@ impl Catalog for HmsCatalog {
.map_err(from_thrift_error)?;

let table = Table::builder()
.file_io(file_io)
.file_io(self.file_io())
.metadata_location(metadata_location)
.metadata(metadata)
.identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name))
Expand Down Expand Up @@ -396,16 +406,13 @@ impl Catalog for HmsCatalog {

let metadata_location = get_metadata_location(&hive_table.parameters)?;

let file_io = FileIO::from_path(&metadata_location)?
.with_props(&self.config.props)
.build()?;
let mut reader = file_io.new_input(&metadata_location)?.reader().await?;
let mut reader = self.file_io.new_input(&metadata_location)?.reader().await?;
let mut metadata_str = String::new();
reader.read_to_string(&mut metadata_str).await?;
let metadata = serde_json::from_str::<TableMetadata>(&metadata_str)?;

let table = Table::builder()
.file_io(file_io)
.file_io(self.file_io())
.metadata_location(metadata_location)
.metadata(metadata)
.identifier(TableIdent::new(
Expand Down
40 changes: 7 additions & 33 deletions crates/catalog/hms/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,24 +231,16 @@ pub(crate) fn validate_namespace(namespace: &NamespaceIdent) -> Result<String> {
pub(crate) fn get_default_table_location(
namespace: &Namespace,
table_name: impl AsRef<str>,
warehouse: Option<String>,
) -> Result<String> {
warehouse: impl AsRef<str>,
) -> String {
let properties = namespace.properties();

let location = match properties.get(LOCATION) {
Some(location) => location.to_string(),
None => match warehouse {
Some(location) => location,
None => {
return Err(Error::new(
ErrorKind::DataInvalid,
"No default path is set, please specify a location when creating a table",
))
}
},
Some(location) => location,
None => warehouse.as_ref(),
};

Ok(format!("{}/{}", location, table_name.as_ref()))
format!("{}/{}", location, table_name.as_ref())
}

/// Create metadata location from `location` and `version`
Expand Down Expand Up @@ -443,11 +435,7 @@ mod tests {
let table_name = "my_table";

let expected = "db_location/my_table";
let result = get_default_table_location(
&namespace,
table_name,
Some("warehouse_location".to_string()),
)?;
let result = get_default_table_location(&namespace, table_name, "warehouse_location");

assert_eq!(expected, result);

Expand All @@ -460,27 +448,13 @@ mod tests {
let table_name = "my_table";

let expected = "warehouse_location/my_table";
let result = get_default_table_location(
&namespace,
table_name,
Some("warehouse_location".to_string()),
)?;
let result = get_default_table_location(&namespace, table_name, "warehouse_location");

assert_eq!(expected, result);

Ok(())
}

#[test]
fn test_get_default_table_location_missing() {
let namespace = Namespace::new(NamespaceIdent::new("default".into()));
let table_name = "my_table";

let result = get_default_table_location(&namespace, table_name, None);

assert!(result.is_err());
}

#[test]
fn test_convert_to_namespace() -> Result<()> {
let properties = HashMap::from([
Expand Down
14 changes: 3 additions & 11 deletions crates/catalog/hms/tests/hms_catalog_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
use std::collections::HashMap;

use iceberg::io::{
FileIO, FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY,
};
use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation, TableIdent};
use iceberg_catalog_hms::{HmsCatalog, HmsCatalogConfig, HmsThriftTransport};
Expand All @@ -37,7 +35,6 @@ type Result<T> = std::result::Result<T, iceberg::Error>;
struct TestFixture {
_docker_compose: DockerCompose,
hms_catalog: HmsCatalog,
file_io: FileIO,
}

async fn set_test_fixture(func: &str) -> TestFixture {
Expand Down Expand Up @@ -73,11 +70,6 @@ async fn set_test_fixture(func: &str) -> TestFixture {
(S3_REGION.to_string(), "us-east-1".to_string()),
]);

let file_io = FileIOBuilder::new("s3a")
.with_props(&props)
.build()
.unwrap();

let config = HmsCatalogConfig::builder()
.address(format!("{}:{}", hms_catalog_ip, HMS_CATALOG_PORT))
.thrift_transport(HmsThriftTransport::Buffered)
Expand All @@ -90,7 +82,6 @@ async fn set_test_fixture(func: &str) -> TestFixture {
TestFixture {
_docker_compose: docker_compose,
hms_catalog,
file_io,
}
}

Expand Down Expand Up @@ -219,7 +210,8 @@ async fn test_create_table() -> Result<()> {
.is_some_and(|location| location.starts_with("s3a://warehouse/hive/metadata/00000-")));
assert!(
fixture
.file_io
.hms_catalog
.file_io()
.is_exist("s3a://warehouse/hive/metadata/")
.await?
);
Expand Down

0 comments on commit 98a2f27

Please sign in to comment.