-
Notifications
You must be signed in to change notification settings - Fork 395
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(spark-connector):support JDBC catalog (#6212)
### What changes were proposed in this pull request? Support JDBC catalog in Spark Connector ### Why are the changes needed? Fix: #1572 #6164 ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? TestJdbcPropertiesConverter SparkJdbcCatalogIT
- Loading branch information
1 parent
792ded1
commit ab85969
Showing
29 changed files
with
957 additions
and
17 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
--- | ||
title: "Spark connector JDBC catalog" | ||
slug: /spark-connector/spark-catalog-jdbc | ||
keyword: spark connector jdbc catalog | ||
license: "This software is licensed under the Apache License version 2." | ||
--- | ||
|
||
The Apache Gravitino Spark connector offers the capability to read JDBC tables, with the metadata managed by the Gravitino server. To enable the use of the JDBC catalog within the Spark connector, you must download the jdbc driver jar which you used to Spark classpath. | ||
|
||
## Capabilities | ||
|
||
Supports MySQL and PostgreSQL. For OceanBase which is compatible with Mysql Dialects could use Mysql driver and Mysql Dialects as a trackoff way. But for Doris which do not support MySQL Dialects, are not currently supported. | ||
|
||
#### Support DML and DDL operations: | ||
|
||
- `CREATE TABLE` | ||
- `DROP TABLE` | ||
- `ALTER TABLE` | ||
- `SELECT` | ||
- `INSERT` | ||
|
||
:::info | ||
JDBCTable does not support distributed transaction. When writing data to RDBMS, each task is an independent transaction. If some tasks of spark succeed and some tasks fail, dirty data is generated. | ||
::: | ||
|
||
#### Not supported operations: | ||
|
||
- `UPDATE` | ||
- `DELETE` | ||
- `TRUNCATE` | ||
|
||
## SQL example | ||
|
||
```sql | ||
-- Suppose mysql_a is the mysql catalog name managed by Gravitino | ||
USE mysql_a; | ||
|
||
CREATE DATABASE IF NOT EXISTS mydatabase; | ||
USE mydatabase; | ||
|
||
CREATE TABLE IF NOT EXISTS employee ( | ||
id bigint, | ||
name string, | ||
department string, | ||
hire_date timestamp | ||
) | ||
DESC TABLE EXTENDED employee; | ||
|
||
INSERT INTO employee | ||
VALUES | ||
(1, 'Alice', 'Engineering', TIMESTAMP '2021-01-01 09:00:00'), | ||
(2, 'Bob', 'Marketing', TIMESTAMP '2021-02-01 10:30:00'), | ||
(3, 'Charlie', 'Sales', TIMESTAMP '2021-03-01 08:45:00'); | ||
|
||
SELECT * FROM employee WHERE date(hire_date) = '2021-01-01'; | ||
|
||
|
||
``` | ||
|
||
## Catalog properties | ||
|
||
Gravitino spark connector will transform below property names which are defined in catalog properties to Spark JDBC connector configuration. | ||
|
||
| Gravitino catalog property name | Spark JDBC connector configuration | Description | Since Version | | ||
|---------------------------------|------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------| | ||
| `jdbc-url` | `url` | JDBC URL for connecting to the database. For example, jdbc:mysql://localhost:3306 | 0.3.0 | | ||
| `jdbc-user` | `jdbc.user` | JDBC user name | 0.3.0 | | ||
| `jdbc-password` | `jdbc.password` | JDBC password | 0.3.0 | | ||
| `jdbc-driver` | `driver` | The driver of the JDBC connection. For example, com.mysql.jdbc.Driver or com.mysql.cj.jdbc.Driver | 0.3.0 | | ||
|
||
Gravitino catalog property names with the prefix `spark.bypass.` are passed to Spark JDBC connector. | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
108 changes: 108 additions & 0 deletions
108
...-common/src/main/java/org/apache/gravitino/spark/connector/jdbc/GravitinoJdbcCatalog.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.apache.gravitino.spark.connector.jdbc; | ||
|
||
import com.google.common.collect.Maps; | ||
import java.util.Map; | ||
import org.apache.gravitino.spark.connector.PropertiesConverter; | ||
import org.apache.gravitino.spark.connector.SparkTransformConverter; | ||
import org.apache.gravitino.spark.connector.SparkTypeConverter; | ||
import org.apache.gravitino.spark.connector.catalog.BaseCatalog; | ||
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; | ||
import org.apache.spark.sql.connector.catalog.Identifier; | ||
import org.apache.spark.sql.connector.catalog.SupportsNamespaces; | ||
import org.apache.spark.sql.connector.catalog.Table; | ||
import org.apache.spark.sql.connector.catalog.TableCatalog; | ||
import org.apache.spark.sql.errors.QueryCompilationErrors; | ||
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTable; | ||
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog; | ||
import org.apache.spark.sql.util.CaseInsensitiveStringMap; | ||
|
||
public class GravitinoJdbcCatalog extends BaseCatalog { | ||
|
||
@Override | ||
protected TableCatalog createAndInitSparkCatalog( | ||
String name, CaseInsensitiveStringMap options, Map<String, String> properties) { | ||
JDBCTableCatalog jdbcTableCatalog = new JDBCTableCatalog(); | ||
Map<String, String> all = | ||
getPropertiesConverter().toSparkCatalogProperties(options, properties); | ||
jdbcTableCatalog.initialize(name, new CaseInsensitiveStringMap(all)); | ||
return jdbcTableCatalog; | ||
} | ||
|
||
@Override | ||
protected Table createSparkTable( | ||
Identifier identifier, | ||
org.apache.gravitino.rel.Table gravitinoTable, | ||
Table sparkTable, | ||
TableCatalog sparkCatalog, | ||
PropertiesConverter propertiesConverter, | ||
SparkTransformConverter sparkTransformConverter, | ||
SparkTypeConverter sparkTypeConverter) { | ||
return new SparkJdbcTable( | ||
identifier, | ||
gravitinoTable, | ||
(JDBCTable) sparkTable, | ||
(JDBCTableCatalog) sparkCatalog, | ||
propertiesConverter, | ||
sparkTransformConverter, | ||
sparkTypeConverter); | ||
} | ||
|
||
@Override | ||
protected PropertiesConverter getPropertiesConverter() { | ||
return JdbcPropertiesConverter.getInstance(); | ||
} | ||
|
||
@Override | ||
protected SparkTransformConverter getSparkTransformConverter() { | ||
return new SparkTransformConverter(false); | ||
} | ||
|
||
@Override | ||
protected SparkTypeConverter getSparkTypeConverter() { | ||
return new SparkJdbcTypeConverter(); | ||
} | ||
|
||
@Override | ||
public void createNamespace(String[] namespace, Map<String, String> metadata) | ||
throws NamespaceAlreadyExistsException { | ||
Map<String, String> properties = Maps.newHashMap(); | ||
if (!metadata.isEmpty()) { | ||
metadata.forEach( | ||
(k, v) -> { | ||
switch (k) { | ||
case SupportsNamespaces.PROP_COMMENT: | ||
properties.put(k, v); | ||
break; | ||
case SupportsNamespaces.PROP_OWNER: | ||
break; | ||
case SupportsNamespaces.PROP_LOCATION: | ||
throw new RuntimeException( | ||
QueryCompilationErrors.cannotCreateJDBCNamespaceUsingProviderError()); | ||
default: | ||
throw new RuntimeException( | ||
QueryCompilationErrors.cannotCreateJDBCNamespaceWithPropertyError(k)); | ||
} | ||
}); | ||
} | ||
super.createNamespace(namespace, properties); | ||
} | ||
} |
33 changes: 33 additions & 0 deletions
33
...mmon/src/main/java/org/apache/gravitino/spark/connector/jdbc/JdbcPropertiesConstants.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.apache.gravitino.spark.connector.jdbc; | ||
|
||
public class JdbcPropertiesConstants { | ||
|
||
public static final String GRAVITINO_JDBC_USER = "jdbc-user"; | ||
public static final String GRAVITINO_JDBC_PASSWORD = "jdbc-password"; | ||
public static final String GRAVITINO_JDBC_DRIVER = "jdbc-driver"; | ||
public static final String GRAVITINO_JDBC_URL = "jdbc-url"; | ||
|
||
public static final String SPARK_JDBC_URL = "url"; | ||
public static final String SPARK_JDBC_USER = "user"; | ||
public static final String SPARK_JDBC_PASSWORD = "password"; | ||
public static final String SPARK_JDBC_DRIVER = "driver"; | ||
} |
73 changes: 73 additions & 0 deletions
73
...mmon/src/main/java/org/apache/gravitino/spark/connector/jdbc/JdbcPropertiesConverter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.apache.gravitino.spark.connector.jdbc; | ||
|
||
import com.google.common.base.Preconditions; | ||
import com.google.common.collect.ImmutableMap; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import org.apache.gravitino.spark.connector.PropertiesConverter; | ||
|
||
public class JdbcPropertiesConverter implements PropertiesConverter { | ||
|
||
public static class JdbcPropertiesConverterHolder { | ||
private static final JdbcPropertiesConverter INSTANCE = new JdbcPropertiesConverter(); | ||
} | ||
|
||
private JdbcPropertiesConverter() {} | ||
|
||
public static JdbcPropertiesConverter getInstance() { | ||
return JdbcPropertiesConverterHolder.INSTANCE; | ||
} | ||
|
||
private static final Map<String, String> GRAVITINO_CONFIG_TO_JDBC = | ||
ImmutableMap.of( | ||
JdbcPropertiesConstants.GRAVITINO_JDBC_URL, | ||
JdbcPropertiesConstants.SPARK_JDBC_URL, | ||
JdbcPropertiesConstants.GRAVITINO_JDBC_USER, | ||
JdbcPropertiesConstants.SPARK_JDBC_USER, | ||
JdbcPropertiesConstants.GRAVITINO_JDBC_PASSWORD, | ||
JdbcPropertiesConstants.SPARK_JDBC_PASSWORD, | ||
JdbcPropertiesConstants.GRAVITINO_JDBC_DRIVER, | ||
JdbcPropertiesConstants.SPARK_JDBC_DRIVER); | ||
|
||
@Override | ||
public Map<String, String> toSparkCatalogProperties(Map<String, String> properties) { | ||
Preconditions.checkArgument(properties != null, "Jdbc Catalog properties should not be null"); | ||
HashMap<String, String> jdbcProperties = new HashMap<>(); | ||
properties.forEach( | ||
(key, value) -> { | ||
if (GRAVITINO_CONFIG_TO_JDBC.containsKey(key)) { | ||
jdbcProperties.put(GRAVITINO_CONFIG_TO_JDBC.get(key), value); | ||
} | ||
}); | ||
return jdbcProperties; | ||
} | ||
|
||
@Override | ||
public Map<String, String> toGravitinoTableProperties(Map<String, String> properties) { | ||
return new HashMap<>(properties); | ||
} | ||
|
||
@Override | ||
public Map<String, String> toSparkTableProperties(Map<String, String> properties) { | ||
return new HashMap<>(properties); | ||
} | ||
} |
71 changes: 71 additions & 0 deletions
71
.../spark-common/src/main/java/org/apache/gravitino/spark/connector/jdbc/SparkJdbcTable.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.apache.gravitino.spark.connector.jdbc; | ||
|
||
import java.util.Map; | ||
import org.apache.gravitino.rel.Table; | ||
import org.apache.gravitino.spark.connector.PropertiesConverter; | ||
import org.apache.gravitino.spark.connector.SparkTransformConverter; | ||
import org.apache.gravitino.spark.connector.SparkTypeConverter; | ||
import org.apache.gravitino.spark.connector.utils.GravitinoTableInfoHelper; | ||
import org.apache.spark.sql.connector.catalog.Identifier; | ||
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTable; | ||
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog; | ||
import org.apache.spark.sql.types.StructType; | ||
|
||
public class SparkJdbcTable extends JDBCTable { | ||
|
||
private GravitinoTableInfoHelper gravitinoTableInfoHelper; | ||
|
||
public SparkJdbcTable( | ||
Identifier identifier, | ||
Table gravitinoTable, | ||
JDBCTable jdbcTable, | ||
JDBCTableCatalog jdbcTableCatalog, | ||
PropertiesConverter propertiesConverter, | ||
SparkTransformConverter sparkTransformConverter, | ||
SparkTypeConverter sparkTypeConverter) { | ||
super(identifier, jdbcTable.schema(), jdbcTable.jdbcOptions()); | ||
this.gravitinoTableInfoHelper = | ||
new GravitinoTableInfoHelper( | ||
false, | ||
identifier, | ||
gravitinoTable, | ||
propertiesConverter, | ||
sparkTransformConverter, | ||
sparkTypeConverter); | ||
} | ||
|
||
@Override | ||
public String name() { | ||
return gravitinoTableInfoHelper.name(); | ||
} | ||
|
||
@Override | ||
@SuppressWarnings("deprecation") | ||
public StructType schema() { | ||
return gravitinoTableInfoHelper.schema(); | ||
} | ||
|
||
@Override | ||
public Map<String, String> properties() { | ||
return gravitinoTableInfoHelper.properties(); | ||
} | ||
} |
Oops, something went wrong.