Skip to content

Commit

Permalink
Merge branch 'dev' into hotfix-hbase-source-problem
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X authored Jul 10, 2024
2 parents 5a1bbdc + 57e5627 commit f58b433
Show file tree
Hide file tree
Showing 34 changed files with 714 additions and 70 deletions.
6 changes: 2 additions & 4 deletions .github/workflows/backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1052,7 +1052,7 @@ jobs:

kafka-connector-it:
needs: [ changes, sanity-check ]
if: needs.changes.outputs.api == 'true'
if: needs.changes.outputs.api == 'true' || contains(needs.changes.outputs.it-modules, 'connector-kafka-e2e')
runs-on: ${{ matrix.os }}
strategy:
matrix:
Expand All @@ -1068,15 +1068,14 @@ jobs:
distribution: 'temurin'
cache: 'maven'
- name: run kafka connector integration test
if: needs.changes.outputs.api == 'true'
run: |
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-kafka-e2e -am -Pci
env:
MAVEN_OPTS: -Xmx4096m

rocketmq-connector-it:
needs: [ changes, sanity-check ]
if: needs.changes.outputs.api == 'true'
if: needs.changes.outputs.api == 'true' || contains(needs.changes.outputs.it-modules, 'connector-rocketmq-e2e')
runs-on: ${{ matrix.os }}
strategy:
matrix:
Expand All @@ -1092,7 +1091,6 @@ jobs:
distribution: 'temurin'
cache: 'maven'
- name: run rocket connector integration test
if: needs.changes.outputs.api == 'true'
run: |
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-rocketmq-e2e -am -Pci
env:
Expand Down
1 change: 1 addition & 0 deletions docs/en/concept/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ Some Notes:
- quota with `'` if the value has special character (like `(`)
- if the replacement variables is in `"` or `'`, like `resName` and `nameVal`, you need add `"`
- the value can't have space `' '`, like `-i jobName='this is a job name' `, this will be replaced to `job.name = "this"`
- If you want to use dynamic parameters,you can use the following format: -i date=$(date +"%Y%m%d").

## What's More

Expand Down
2 changes: 1 addition & 1 deletion docs/en/connector-v2/formats/avro.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ source {
bootstrap.servers = "kafkaCluster:9092"
topic = "test_avro_topic"
result_table_name = "kafka_table"
kafka.auto.offset.reset = "earliest"
start_mode = "earliest"
format = avro
format_error_handle_way = skip
schema = {
Expand Down
13 changes: 13 additions & 0 deletions docs/en/connector-v2/sink/Hbase.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,20 @@ Hbase {
all_columns = seatunnel
}
}
```

## Writes To The Specified Column Family

```hocon
Hbase {
zookeeper_quorum = "hbase_e2e:2181"
table = "assign_cf_table"
rowkey_column = ["id"]
family_name {
c_double = "cf1"
c_bigint = "cf2"
}
}
```

## Changelog
Expand Down
112 changes: 112 additions & 0 deletions docs/zh/concept/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,118 @@ sink模块,你可以快速高效地完成这个操作。Sink和source非常相
`result_table_name` 和 `source_table_name` 配置。但你会发现在上面的配置例子中,不是每个模块都配置了这些参数,因为在SeaTunnel中,
有一个默认的约定,如果这两个参数没有配置,则使用上一个节点的最后一个模块生成的数据。当只有一个source时这是非常方便的。

## 配置变量替换

在配置文件中,我们可以定义一些变量并在运行时替换它们。这仅支持 hocon 格式的文件。

```hocon
env {
job.mode = "BATCH"
job.name = ${jobName}
parallelism = 2
}
source {
FakeSource {
result_table_name = ${resName}
row.num = ${rowNum}
string.template = ${strTemplate}
int.template = [20, 21]
schema = {
fields {
name = ${nameType}
age = "int"
}
}
}
}
transform {
sql {
source_table_name = "fake"
result_table_name = "sql"
query = "select * from "${resName}" where name = '"${nameVal}"' "
}
}
sink {
Console {
source_table_name = "sql"
username = ${username}
password = ${password}
}
}
```

在上述配置中,我们定义了一些变量,如 ${rowNum}、${resName}。
我们可以使用以下 shell 命令替换这些参数:

```shell
./bin/seatunnel.sh -c <this_config_file>
-i jobName='this_is_a_job_name'
-i resName=fake
-i rowNum=10
-i strTemplate=['abc','d~f','hi']
-i nameType=string
-i nameVal=abc
-i username=seatunnel=2.3.1
-i password='$a^b%c.d~e0*9('
-e local
```

然后最终提交的配置是:

```hocon
env {
job.mode = "BATCH"
job.name = "this_is_a_job_name"
parallelism = 2
}
source {
FakeSource {
result_table_name = "fake"
row.num = 10
string.template = ['abc','d~f','hi']
int.template = [20, 21]
schema = {
fields {
name = "string"
age = "int"
}
}
}
}
transform {
sql {
source_table_name = "fake"
result_table_name = "sql"
query = "select * from "fake" where name = 'abc' "
}
}
sink {
Console {
source_table_name = "sql"
username = "seatunnel=2.3.1"
password = "$a^b%c.d~e0*9("
}
}
```

一些注意事项:

- 如果值包含特殊字符(如`(`),请使用`'`引号将其括起来。
- 如果替换变量包含`"`或`'`(如`"resName"`和`"nameVal"`),需要添加`"`。
- 值不能包含空格`' '`。例如, `-i jobName='this is a job name'`将被替换为`job.name = "this"`。
- 如果要使用动态参数,可以使用以下格式: `-i date=$(date +"%Y%m%d")`

## 此外

如果你想了解更多关于格式配置的详细信息,请查看 [HOCON](https://github.com/lightbend/config/blob/main/HOCON.md)。

2 changes: 1 addition & 1 deletion docs/zh/connector-v2/formats/avro.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ source {
bootstrap.servers = "kafkaCluster:9092"
topic = "test_avro_topic"
result_table_name = "kafka_table"
kafka.auto.offset.reset = "earliest"
start_mode = "earliest"
format = avro
format_error_handle_way = skip
schema = {
Expand Down
14 changes: 14 additions & 0 deletions docs/zh/connector-v2/sink/Hbase.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,20 @@ Hbase {
```

## 写入指定列族

```hocon
Hbase {
zookeeper_quorum = "hbase_e2e:2181"
table = "assign_cf_table"
rowkey_column = ["id"]
family_name {
c_double = "cf1"
c_bigint = "cf2"
}
}
```

## 更改日志

### 下一个版本
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,7 @@ public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)

@Override
public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists) {
dropTable(tablePath, ignoreIfNotExists);
createTable(tablePath, null, ignoreIfNotExists);
esRestClient.clearIndexData(tablePath.getTableName());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,35 @@ public void dropIndex(String tableName) {
}
}

public void clearIndexData(String indexName) {
String endpoint = String.format("/%s/_delete_by_query", indexName);
Request request = new Request("POST", endpoint);
String jsonString = "{ \"query\": { \"match_all\": {} } }";
request.setJsonEntity(jsonString);

try {
Response response = restClient.performRequest(request);
if (response == null) {
throw new ElasticsearchConnectorException(
ElasticsearchConnectorErrorCode.CLEAR_INDEX_DATA_FAILED,
"POST " + endpoint + " response null");
}
// todo: if the index doesn't exist, the response status code is 200?
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
return;
} else {
throw new ElasticsearchConnectorException(
ElasticsearchConnectorErrorCode.CLEAR_INDEX_DATA_FAILED,
String.format(
"POST %s response status code=%d",
endpoint, response.getStatusLine().getStatusCode()));
}
} catch (IOException ex) {
throw new ElasticsearchConnectorException(
ElasticsearchConnectorErrorCode.CLEAR_INDEX_DATA_FAILED, ex);
}
}

/**
* get es field name and type mapping realtion
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public enum ElasticsearchConnectorErrorCode implements SeaTunnelErrorCode {
LIST_INDEX_FAILED("ELASTICSEARCH-05", "List elasticsearch index failed"),
DROP_INDEX_FAILED("ELASTICSEARCH-06", "Drop elasticsearch index failed"),
CREATE_INDEX_FAILED("ELASTICSEARCH-07", "Create elasticsearch index failed"),
ES_FIELD_TYPE_NOT_SUPPORT("ELASTICSEARCH-08", "Not support the elasticsearch field type");
ES_FIELD_TYPE_NOT_SUPPORT("ELASTICSEARCH-08", "Not support the elasticsearch field type"),
CLEAR_INDEX_DATA_FAILED("ELASTICSEARCH-09", "Clear elasticsearch index data failed");
;

private final String code;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
getPluginName(), PluginType.SINK, result.getMsg()));
}
this.hbaseParameters = HbaseParameters.buildWithSinkConfig(pluginConfig);
if (hbaseParameters.getFamilyNames().size() == 0) {
throw new HbaseConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
"The corresponding field options should be configured and should not be empty Refer to the hbase sink document");
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand All @@ -62,7 +63,7 @@ public class HbaseSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {

private final int versionColumnIndex;

private String defaultFamilyName = "value";
private String writeAllColumnFamily;

public HbaseSinkWriter(
SeaTunnelRowType seaTunnelRowType,
Expand All @@ -76,7 +77,7 @@ public HbaseSinkWriter(
this.versionColumnIndex = versionColumnIndex;

if (hbaseParameters.getFamilyNames().size() == 1) {
defaultFamilyName = hbaseParameters.getFamilyNames().getOrDefault(ALL_COLUMNS, "value");
this.writeAllColumnFamily = hbaseParameters.getFamilyNames().get(ALL_COLUMNS);
}

// initialize hbase configuration
Expand Down Expand Up @@ -131,8 +132,14 @@ private Put convertRowToPut(SeaTunnelRow row) {
.collect(Collectors.toList());
for (Integer writeColumnIndex : writeColumnIndexes) {
String fieldName = seaTunnelRowType.getFieldName(writeColumnIndex);
// This is the family of columns that we define to be written through the.conf file
Map<String, String> configurationFamilyNames = hbaseParameters.getFamilyNames();
String familyName =
hbaseParameters.getFamilyNames().getOrDefault(fieldName, defaultFamilyName);
configurationFamilyNames.getOrDefault(fieldName, writeAllColumnFamily);
if (!configurationFamilyNames.containsKey(ALL_COLUMNS)
&& !configurationFamilyNames.containsKey(fieldName)) {
continue;
}
byte[] bytes = convertColumnToBytes(row, writeColumnIndex);
if (bytes != null) {
put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(fieldName), bytes);
Expand Down
Loading

0 comments on commit f58b433

Please sign in to comment.