Skip to content

Commit

Permalink
feat: client improvements & flaky tests (#23)
Browse files Browse the repository at this point in the history
- Update client mod to be more reusable
- Leverage environment variables for `SPARK_REMOTE` and `USER` 
- Move `SparkSessionBuilder` to `session` module
- Change `mutex` to `rwlock` when creating the client object
- Fix flaky tests
- Remove `arrow` as a re-export
  • Loading branch information
sjrusso8 authored Apr 27, 2024
1 parent 418e926 commit 073e1fa
Show file tree
Hide file tree
Showing 11 changed files with 314 additions and 274 deletions.
56 changes: 18 additions & 38 deletions src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,14 +489,18 @@ mod tests {
async fn test_set_current_database() -> Result<(), SparkError> {
let spark = setup().await;

spark.clone().sql("CREATE SCHEMA current_db").await?;

spark
.clone()
.sql("CREATE SCHEMA IF NOT EXISTS spark_rust_db")
.catalog()
.setCurrentDatabase("current_db")
.await?;

spark.catalog().setCurrentDatabase("spark_rust_db").await?;

assert!(true);

spark.clone().sql("DROP SCHEMA current_db").await?;

Ok(())
}

Expand All @@ -513,41 +517,19 @@ mod tests {

()
}
#[tokio::test]
async fn test_list_databases() -> Result<(), SparkError> {
let spark = setup().await;

spark
.clone()
.sql("CREATE SCHEMA IF NOT EXISTS spark_rust")
.await
.unwrap();

let res = spark.clone().catalog().listDatabases(None).await?;

assert_eq!(4, res.num_columns());
assert_eq!(2, res.num_rows());

let res = spark.catalog().listDatabases(Some("*rust")).await?;

assert_eq!(4, res.num_columns());
assert_eq!(1, res.num_rows());

Ok(())
}

#[tokio::test]
async fn test_get_database() -> Result<(), SparkError> {
let spark = setup().await;

spark
.clone()
.sql("CREATE SCHEMA IF NOT EXISTS spark_rust")
.await?;
spark.clone().sql("CREATE SCHEMA get_db").await?;

let res = spark.catalog().getDatabase("spark_rust").await?;
let res = spark.clone().catalog().getDatabase("get_db").await?;

assert_eq!(res.num_rows(), 1);

spark.clone().sql("DROP SCHEMA get_db").await?;

Ok(())
}

Expand Down Expand Up @@ -633,30 +615,28 @@ mod tests {
async fn test_cache_table() -> Result<(), SparkError> {
let spark = setup().await;

spark.clone().sql("DROP TABLE IF EXISTS tmp_table").await?;

spark
.clone()
.sql("CREATE TABLE tmp_table (name STRING, age INT) using parquet")
.sql("CREATE TABLE cache_table (name STRING, age INT) using parquet")
.await?;

spark
.clone()
.catalog()
.cacheTable("tmp_table", None)
.cacheTable("cache_table", None)
.await?;

let res = spark.clone().catalog().isCached("tmp_table").await?;
let res = spark.clone().catalog().isCached("cache_table").await?;

assert!(res);

spark.clone().catalog().uncacheTable("tmp_table").await?;
spark.clone().catalog().uncacheTable("cache_table").await?;

let res = spark.clone().catalog().isCached("tmp_table").await?;
let res = spark.clone().catalog().isCached("cache_table").await?;

assert!(!res);

spark.sql("DROP TABLE IF EXISTS tmp_table").await?;
spark.sql("DROP TABLE cache_table").await?;
Ok(())
}
}
Loading

0 comments on commit 073e1fa

Please sign in to comment.