diff --git a/soda/spark/soda/data_sources/spark_data_source.py b/soda/spark/soda/data_sources/spark_data_source.py index 018f00594..9de4569e3 100644 --- a/soda/spark/soda/data_sources/spark_data_source.py +++ b/soda/spark/soda/data_sources/spark_data_source.py @@ -26,6 +26,7 @@ def hive_connection_function( database: str, auth_method: str, kerberos_service_name: str, + scheme: str | None, **kwargs, ): """ @@ -64,6 +65,7 @@ def hive_connection_function( auth=auth_method, configuration=kwargs.get("configuration", {}), kerberos_service_name=kerberos_service_name, + scheme=scheme, ) return connection @@ -440,9 +442,14 @@ def __init__(self, logs: Logs, data_source_name: str, data_source_properties: di self.port = data_source_properties.get("port", "10000") self.username = data_source_properties.get("username") self.password = data_source_properties.get("password") - self.database = data_source_properties.get("catalog", "default") + # 20231114: fallback on database, which has been in the docs for a while + self.database = data_source_properties.get("catalog", getattr(self, "database", "default")) self.schema = data_source_properties.get("schema", "default") + + # Support both auth_method and authentication for backwards compatibility self.auth_method = data_source_properties.get("authentication", None) + self.auth_method = data_source_properties.get("auth_method", self.auth_method) + self.kerberos_service_name = data_source_properties.get("kerberos_service_name", None) self.configuration = data_source_properties.get("configuration", {}) self.driver = data_source_properties.get("driver", None) @@ -451,6 +458,7 @@ def __init__(self, logs: Logs, data_source_name: str, data_source_properties: di self.server_side_parameters = { f"SSP_{k}": f"{{{v}}}" for k, v in data_source_properties.get("server_side_parameters", {}) } + self.scheme = data_source_properties.get("scheme", "http") def connect(self): if self.method == SparkConnectionMethod.HIVE: @@ -479,6 +487,7 @@ def connect(self): cluster=self.cluster, server_side_parameters=self.server_side_parameters, configuration=self.configuration, + scheme=self.scheme, ) self.connection = connection