diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml index 9975d477dae..b6094aff4d3 100644 --- a/.github/workflows/backend.yml +++ b/.github/workflows/backend.yml @@ -1052,7 +1052,7 @@ jobs: kafka-connector-it: needs: [ changes, sanity-check ] - if: needs.changes.outputs.api == 'true' + if: needs.changes.outputs.api == 'true' || contains(needs.changes.outputs.it-modules, 'connector-kafka-e2e') runs-on: ${{ matrix.os }} strategy: matrix: @@ -1068,7 +1068,6 @@ jobs: distribution: 'temurin' cache: 'maven' - name: run kafka connector integration test - if: needs.changes.outputs.api == 'true' run: | ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-kafka-e2e -am -Pci env: @@ -1076,7 +1075,7 @@ jobs: rocketmq-connector-it: needs: [ changes, sanity-check ] - if: needs.changes.outputs.api == 'true' + if: needs.changes.outputs.api == 'true' || contains(needs.changes.outputs.it-modules, 'connector-rocketmq-e2e') runs-on: ${{ matrix.os }} strategy: matrix: @@ -1092,7 +1091,6 @@ jobs: distribution: 'temurin' cache: 'maven' - name: run rocket connector integration test - if: needs.changes.outputs.api == 'true' run: | ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-rocketmq-e2e -am -Pci env: diff --git a/docs/en/concept/config.md b/docs/en/concept/config.md index 6f17ea03b3b..a8c58bae2de 100644 --- a/docs/en/concept/config.md +++ b/docs/en/concept/config.md @@ -315,6 +315,7 @@ Some Notes: - quota with `'` if the value has special character (like `(`) - if the replacement variables is in `"` or `'`, like `resName` and `nameVal`, you need add `"` - the value can't have space `' '`, like `-i jobName='this is a job name' `, this will be replaced to `job.name = "this"` +- If you want to use dynamic parameters,you can use the following format: -i date=$(date +"%Y%m%d"). ## What's More diff --git a/docs/en/connector-v2/formats/avro.md b/docs/en/connector-v2/formats/avro.md index 638657b3456..8fef411fb58 100644 --- a/docs/en/connector-v2/formats/avro.md +++ b/docs/en/connector-v2/formats/avro.md @@ -77,7 +77,7 @@ source { bootstrap.servers = "kafkaCluster:9092" topic = "test_avro_topic" result_table_name = "kafka_table" - kafka.auto.offset.reset = "earliest" + start_mode = "earliest" format = avro format_error_handle_way = skip schema = { diff --git a/docs/en/connector-v2/sink/Hbase.md b/docs/en/connector-v2/sink/Hbase.md index 51cb4b33622..dd75d21f0be 100644 --- a/docs/en/connector-v2/sink/Hbase.md +++ b/docs/en/connector-v2/sink/Hbase.md @@ -116,7 +116,20 @@ Hbase { all_columns = seatunnel } } +``` + +## Writes To The Specified Column Family +```hocon +Hbase { + zookeeper_quorum = "hbase_e2e:2181" + table = "assign_cf_table" + rowkey_column = ["id"] + family_name { + c_double = "cf1" + c_bigint = "cf2" + } +} ``` ## Changelog diff --git a/docs/zh/concept/config.md b/docs/zh/concept/config.md index baa9a7a7153..8f4368a67f4 100644 --- a/docs/zh/concept/config.md +++ b/docs/zh/concept/config.md @@ -203,6 +203,118 @@ sink模块,你可以快速高效地完成这个操作。Sink和source非常相 `result_table_name` 和 `source_table_name` 配置。但你会发现在上面的配置例子中,不是每个模块都配置了这些参数,因为在SeaTunnel中, 有一个默认的约定,如果这两个参数没有配置,则使用上一个节点的最后一个模块生成的数据。当只有一个source时这是非常方便的。 +## 配置变量替换 + +在配置文件中,我们可以定义一些变量并在运行时替换它们。这仅支持 hocon 格式的文件。 + +```hocon +env { + job.mode = "BATCH" + job.name = ${jobName} + parallelism = 2 +} + +source { + FakeSource { + result_table_name = ${resName} + row.num = ${rowNum} + string.template = ${strTemplate} + int.template = [20, 21] + schema = { + fields { + name = ${nameType} + age = "int" + } + } + } +} + +transform { + sql { + source_table_name = "fake" + result_table_name = "sql" + query = "select * from "${resName}" where name = '"${nameVal}"' " + } + +} + +sink { + Console { + source_table_name = "sql" + username = ${username} + password = ${password} + } +} + +``` + +在上述配置中,我们定义了一些变量,如 ${rowNum}、${resName}。 +我们可以使用以下 shell 命令替换这些参数: + +```shell +./bin/seatunnel.sh -c +-i jobName='this_is_a_job_name' +-i resName=fake +-i rowNum=10 +-i strTemplate=['abc','d~f','hi'] +-i nameType=string +-i nameVal=abc +-i username=seatunnel=2.3.1 +-i password='$a^b%c.d~e0*9(' +-e local +``` + +然后最终提交的配置是: + +```hocon +env { + job.mode = "BATCH" + job.name = "this_is_a_job_name" + parallelism = 2 +} + +source { + FakeSource { + result_table_name = "fake" + row.num = 10 + string.template = ['abc','d~f','hi'] + int.template = [20, 21] + schema = { + fields { + name = "string" + age = "int" + } + } + } +} + +transform { + sql { + source_table_name = "fake" + result_table_name = "sql" + query = "select * from "fake" where name = 'abc' " + } + +} + +sink { + Console { + source_table_name = "sql" + username = "seatunnel=2.3.1" + password = "$a^b%c.d~e0*9(" + } +} + +``` + +一些注意事项: + +- 如果值包含特殊字符(如`(`),请使用`'`引号将其括起来。 +- 如果替换变量包含`"`或`'`(如`"resName"`和`"nameVal"`),需要添加`"`。 +- 值不能包含空格`' '`。例如, `-i jobName='this is a job name'`将被替换为`job.name = "this"`。 +- 如果要使用动态参数,可以使用以下格式: `-i date=$(date +"%Y%m%d")`。 + ## 此外 如果你想了解更多关于格式配置的详细信息,请查看 [HOCON](https://github.com/lightbend/config/blob/main/HOCON.md)。 + diff --git a/docs/zh/connector-v2/formats/avro.md b/docs/zh/connector-v2/formats/avro.md index 4e19ea4b982..7176f4e507f 100644 --- a/docs/zh/connector-v2/formats/avro.md +++ b/docs/zh/connector-v2/formats/avro.md @@ -77,7 +77,7 @@ source { bootstrap.servers = "kafkaCluster:9092" topic = "test_avro_topic" result_table_name = "kafka_table" - kafka.auto.offset.reset = "earliest" + start_mode = "earliest" format = avro format_error_handle_way = skip schema = { diff --git a/docs/zh/connector-v2/sink/Hbase.md b/docs/zh/connector-v2/sink/Hbase.md index a9839dbafa0..871cad206c6 100644 --- a/docs/zh/connector-v2/sink/Hbase.md +++ b/docs/zh/connector-v2/sink/Hbase.md @@ -119,6 +119,20 @@ Hbase { ``` +## 写入指定列族 + +```hocon +Hbase { + zookeeper_quorum = "hbase_e2e:2181" + table = "assign_cf_table" + rowkey_column = ["id"] + family_name { + c_double = "cf1" + c_bigint = "cf2" + } +} +``` + ## 更改日志 ### 下一个版本 diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java index b1eb60e2899..bbf594eb10b 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java @@ -217,8 +217,7 @@ public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists) @Override public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists) { - dropTable(tablePath, ignoreIfNotExists); - createTable(tablePath, null, ignoreIfNotExists); + esRestClient.clearIndexData(tablePath.getTableName()); } @Override diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java index 18c9b7c109b..f80f20f6736 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java @@ -480,6 +480,35 @@ public void dropIndex(String tableName) { } } + public void clearIndexData(String indexName) { + String endpoint = String.format("/%s/_delete_by_query", indexName); + Request request = new Request("POST", endpoint); + String jsonString = "{ \"query\": { \"match_all\": {} } }"; + request.setJsonEntity(jsonString); + + try { + Response response = restClient.performRequest(request); + if (response == null) { + throw new ElasticsearchConnectorException( + ElasticsearchConnectorErrorCode.CLEAR_INDEX_DATA_FAILED, + "POST " + endpoint + " response null"); + } + // todo: if the index doesn't exist, the response status code is 200? + if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { + return; + } else { + throw new ElasticsearchConnectorException( + ElasticsearchConnectorErrorCode.CLEAR_INDEX_DATA_FAILED, + String.format( + "POST %s response status code=%d", + endpoint, response.getStatusLine().getStatusCode())); + } + } catch (IOException ex) { + throw new ElasticsearchConnectorException( + ElasticsearchConnectorErrorCode.CLEAR_INDEX_DATA_FAILED, ex); + } + } + /** * get es field name and type mapping realtion * diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java index 67f01201dd6..fe182868d4d 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java @@ -28,7 +28,8 @@ public enum ElasticsearchConnectorErrorCode implements SeaTunnelErrorCode { LIST_INDEX_FAILED("ELASTICSEARCH-05", "List elasticsearch index failed"), DROP_INDEX_FAILED("ELASTICSEARCH-06", "Drop elasticsearch index failed"), CREATE_INDEX_FAILED("ELASTICSEARCH-07", "Create elasticsearch index failed"), - ES_FIELD_TYPE_NOT_SUPPORT("ELASTICSEARCH-08", "Not support the elasticsearch field type"); + ES_FIELD_TYPE_NOT_SUPPORT("ELASTICSEARCH-08", "Not support the elasticsearch field type"), + CLEAR_INDEX_DATA_FAILED("ELASTICSEARCH-09", "Clear elasticsearch index data failed"); ; private final String code; diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java index 2555f3d5b7f..4f7b929223f 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java @@ -80,6 +80,11 @@ public void prepare(Config pluginConfig) throws PrepareFailException { getPluginName(), PluginType.SINK, result.getMsg())); } this.hbaseParameters = HbaseParameters.buildWithSinkConfig(pluginConfig); + if (hbaseParameters.getFamilyNames().size() == 0) { + throw new HbaseConnectorException( + SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, + "The corresponding field options should be configured and should not be empty Refer to the hbase sink document"); + } } @Override diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java index 72722e582e3..7683d6aab0b 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java @@ -41,6 +41,7 @@ import java.io.IOException; import java.nio.charset.Charset; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -62,7 +63,7 @@ public class HbaseSinkWriter extends AbstractSinkWriter { private final int versionColumnIndex; - private String defaultFamilyName = "value"; + private String writeAllColumnFamily; public HbaseSinkWriter( SeaTunnelRowType seaTunnelRowType, @@ -76,7 +77,7 @@ public HbaseSinkWriter( this.versionColumnIndex = versionColumnIndex; if (hbaseParameters.getFamilyNames().size() == 1) { - defaultFamilyName = hbaseParameters.getFamilyNames().getOrDefault(ALL_COLUMNS, "value"); + this.writeAllColumnFamily = hbaseParameters.getFamilyNames().get(ALL_COLUMNS); } // initialize hbase configuration @@ -131,8 +132,14 @@ private Put convertRowToPut(SeaTunnelRow row) { .collect(Collectors.toList()); for (Integer writeColumnIndex : writeColumnIndexes) { String fieldName = seaTunnelRowType.getFieldName(writeColumnIndex); + // This is the family of columns that we define to be written through the.conf file + Map configurationFamilyNames = hbaseParameters.getFamilyNames(); String familyName = - hbaseParameters.getFamilyNames().getOrDefault(fieldName, defaultFamilyName); + configurationFamilyNames.getOrDefault(fieldName, writeAllColumnFamily); + if (!configurationFamilyNames.containsKey(ALL_COLUMNS) + && !configurationFamilyNames.containsKey(fieldName)) { + continue; + } byte[] bytes = convertColumnToBytes(row, writeColumnIndex); if (bytes != null) { put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(fieldName), bytes); diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java index d136fabc403..02c2a9007e1 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java @@ -45,6 +45,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; @@ -103,7 +104,7 @@ public void pollNext(Collector output) throws Exception { return; } - while (pendingPartitionsQueue.size() != 0) { + while (!pendingPartitionsQueue.isEmpty()) { sourceSplits.add(pendingPartitionsQueue.poll()); } sourceSplits.forEach( @@ -120,9 +121,10 @@ public void pollNext(Collector output) throws Exception { executorService.submit(thread); return thread; })); + List finishedSplits = new CopyOnWriteArrayList<>(); sourceSplits.forEach( sourceSplit -> { - CompletableFuture completableFuture = new CompletableFuture<>(); + CompletableFuture completableFuture = new CompletableFuture<>(); TablePath tablePath = sourceSplit.getTablePath(); DeserializationSchema deserializationSchema = tablePathMetadataMap.get(tablePath).getDeserializationSchema(); @@ -148,9 +150,14 @@ public void pollNext(Collector output) throws Exception { for (TopicPartition partition : partitions) { List> recordList = records.records(partition); + if (Boundedness.BOUNDED.equals( + context.getBoundedness()) + && recordList.isEmpty()) { + completableFuture.complete(true); + return; + } for (ConsumerRecord record : recordList) { - try { if (deserializationSchema instanceof @@ -180,7 +187,8 @@ public void pollNext(Collector output) throws Exception { && record.offset() >= sourceSplit .getEndOffset()) { - break; + completableFuture.complete(true); + return; } } long lastOffset = -1; @@ -199,18 +207,21 @@ public void pollNext(Collector output) throws Exception { } catch (Exception e) { completableFuture.completeExceptionally(e); } - completableFuture.complete(null); + completableFuture.complete(false); }); - } catch (InterruptedException e) { + if (completableFuture.get()) { + finishedSplits.add(sourceSplit); + } + } catch (Exception e) { throw new KafkaConnectorException( KafkaConnectorErrorCode.CONSUME_DATA_FAILED, e); } - completableFuture.join(); }); - if (Boundedness.BOUNDED.equals(context.getBoundedness())) { - // signal to the source that we have reached the end of the data. - context.signalNoMoreElement(); + finishedSplits.forEach(sourceSplits::remove); + if (sourceSplits.isEmpty()) { + context.signalNoMoreElement(); + } } } diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/AbstractCommandArgs.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/AbstractCommandArgs.java index 13d969c3266..5d620c96eea 100644 --- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/AbstractCommandArgs.java +++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/AbstractCommandArgs.java @@ -44,7 +44,9 @@ public abstract class AbstractCommandArgs extends CommandArgs { splitter = ParameterSplitter.class, description = "Variable substitution, such as -i city=beijing, or -i date=20190318." - + "We use ',' as separator, when inside \"\", ',' are treated as normal characters instead of delimiters.") + + "We use ',' as separator, when inside \"\", ',' are treated as normal characters instead of delimiters." + + " For example, -i city=\"beijing,shanghai\". If you want to use dynamic parameters," + + " you can use the following format: -i date=$(date +\"%Y%m%d\").") protected List variables = Collections.emptyList(); /** check config flag */ diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java index 3180f386b27..623dd9d2210 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java @@ -447,35 +447,78 @@ public void tearDown() { } @Test - public void testCatalog() { + public void testCatalog() throws InterruptedException, JsonProcessingException { Map configMap = new HashMap<>(); configMap.put("username", "elastic"); configMap.put("password", "elasticsearch"); - configMap.put("hosts", Arrays.asList("https://" + container.getHttpHostAddress())); + configMap.put( + "hosts", Collections.singletonList("https://" + container.getHttpHostAddress())); configMap.put("index", "st_index3"); configMap.put("tls_verify_certificate", false); configMap.put("tls_verify_hostname", false); configMap.put("index_type", "st"); + final ElasticSearchCatalog elasticSearchCatalog = new ElasticSearchCatalog("Elasticsearch", "", ReadonlyConfig.fromMap(configMap)); elasticSearchCatalog.open(); + TablePath tablePath = TablePath.of("", "st_index3"); - // index exists + + // Verify index does not exist initially final boolean existsBefore = elasticSearchCatalog.tableExists(tablePath); - Assertions.assertFalse(existsBefore); - // create index + Assertions.assertFalse(existsBefore, "Index should not exist initially"); + + // Create index elasticSearchCatalog.createTable(tablePath, null, false); final boolean existsAfter = elasticSearchCatalog.tableExists(tablePath); - Assertions.assertTrue(existsAfter); - // data exists? - final boolean existsData = elasticSearchCatalog.isExistsData(tablePath); - Assertions.assertFalse(existsData); - // truncate + Assertions.assertTrue(existsAfter, "Index should be created"); + + // Generate and add multiple records + List data = generateTestData(); + StringBuilder requestBody = new StringBuilder(); + String indexHeader = "{\"index\":{\"_index\":\"st_index3\"}}\n"; + for (String record : data) { + requestBody.append(indexHeader); + requestBody.append(record); + requestBody.append("\n"); + } + esRestClient.bulk(requestBody.toString()); + Thread.sleep(2000); // Wait for data to be indexed + + // Verify data exists + List sourceFields = Arrays.asList("field1", "field2"); + Map query = new HashMap<>(); + query.put("match_all", new HashMap<>()); + ScrollResult scrollResult = + esRestClient.searchByScroll("st_index3", sourceFields, query, "1m", 100); + Assertions.assertFalse(scrollResult.getDocs().isEmpty(), "Data should exist in the index"); + + // Truncate the table elasticSearchCatalog.truncateTable(tablePath, false); - Assertions.assertTrue(elasticSearchCatalog.tableExists(tablePath)); - // drop + Thread.sleep(2000); // Wait for data to be indexed + + // Verify data is deleted + scrollResult = esRestClient.searchByScroll("st_index3", sourceFields, query, "1m", 100); + Assertions.assertTrue( + scrollResult.getDocs().isEmpty(), "Data should be deleted from the index"); + + // Drop the table elasticSearchCatalog.dropTable(tablePath, false); - Assertions.assertFalse(elasticSearchCatalog.tableExists(tablePath)); + Assertions.assertFalse( + elasticSearchCatalog.tableExists(tablePath), "Index should be dropped"); + elasticSearchCatalog.close(); } + + private List generateTestData() throws JsonProcessingException { + List data = new ArrayList<>(); + ObjectMapper objectMapper = new ObjectMapper(); + for (int i = 0; i < 10; i++) { + Map record = new HashMap<>(); + record.put("field1", "value" + i); + record.put("field2", i); + data.add(objectMapper.writeValueAsString(record)); + } + return data; + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java index d3cd57b326f..13a7a8805a6 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java @@ -56,6 +56,7 @@ public class HbaseIT extends TestSuiteBase implements TestResource { private static final String TABLE_NAME = "seatunnel_test"; + private static final String ASSIGN_CF_TABLE_NAME = "assign_cf_table"; private static final String FAMILY_NAME = "info"; @@ -64,6 +65,7 @@ public class HbaseIT extends TestSuiteBase implements TestResource { private Admin admin; private TableName table; + private TableName tableAssign; private HbaseCluster hbaseCluster; @@ -75,7 +77,9 @@ public void startUp() throws Exception { // Create table for hbase sink test log.info("initial"); hbaseCluster.createTable(TABLE_NAME, Arrays.asList(FAMILY_NAME)); + hbaseCluster.createTable(ASSIGN_CF_TABLE_NAME, Arrays.asList("cf1", "cf2")); table = TableName.valueOf(TABLE_NAME); + tableAssign = TableName.valueOf(ASSIGN_CF_TABLE_NAME); } @AfterAll @@ -133,6 +137,46 @@ public void testHbaseSinkWithArray(TestContainer container) scanner.close(); } + @TestTemplate + public void testHbaseSinkAssignCfSink(TestContainer container) + throws IOException, InterruptedException { + deleteData(tableAssign); + + Container.ExecResult sinkExecResult = container.executeJob("/fake-to-assign-cf-hbase.conf"); + Assertions.assertEquals(0, sinkExecResult.getExitCode()); + + Table hbaseTable = hbaseConnection.getTable(tableAssign); + Scan scan = new Scan(); + ResultScanner scanner = hbaseTable.getScanner(scan); + ArrayList results = new ArrayList<>(); + for (Result result : scanner) { + results.add(result); + } + + Assertions.assertEquals(results.size(), 5); + + if (scanner != null) { + scanner.close(); + } + int cf1Count = 0; + int cf2Count = 0; + + for (Result result : results) { + for (Cell cell : result.listCells()) { + String family = Bytes.toString(CellUtil.cloneFamily(cell)); + if ("cf1".equals(family)) { + cf1Count++; + } + if ("cf2".equals(family)) { + cf2Count++; + } + } + } + // check cf1 and cf2 + Assertions.assertEquals(cf1Count, 5); + Assertions.assertEquals(cf2Count, 5); + } + private void deleteData(TableName table) throws IOException { Table hbaseTable = hbaseConnection.getTable(table); Scan scan = new Scan(); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-assign-cf-hbase.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-assign-cf-hbase.conf new file mode 100644 index 00000000000..26f2307dfd1 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-assign-cf-hbase.conf @@ -0,0 +1,47 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + schema = { + fields { + id = int + c_double = double + c_bigint = bigint + } + } + } +} + +sink { + Hbase { + zookeeper_quorum = "hbase_e2e:2181" + table = "assign_cf_table" + rowkey_column = ["id"] + family_name { + c_double = "cf1" + c_bigint = "cf2" + } + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java index 2f1c92048e0..d4629851e79 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java @@ -212,6 +212,27 @@ public void testSourceKafkaTextToConsole(TestContainer container) Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); } + @TestTemplate + public void testSourceKafkaToAssertWithMaxPollRecords1(TestContainer container) + throws IOException, InterruptedException { + TextSerializationSchema serializer = + TextSerializationSchema.builder() + .seaTunnelRowType(SEATUNNEL_ROW_TYPE) + .delimiter(",") + .build(); + generateTestData( + row -> + new ProducerRecord<>( + "test_topic_text_max_poll_records_1", + null, + serializer.serialize(row)), + 0, + 100); + Container.ExecResult execResult = + container.executeJob("/kafka/kafka_source_to_assert_with_max_poll_records_1.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + @TestTemplate public void testSourceKafkaTextToConsoleAssertCatalogTable(TestContainer container) throws IOException, InterruptedException { @@ -538,29 +559,34 @@ private Properties kafkaByteConsumerConfig() { } private void generateTestData(ProducerRecordConverter converter, int start, int end) { - for (int i = start; i < end; i++) { - SeaTunnelRow row = - new SeaTunnelRow( - new Object[] { - Long.valueOf(i), - Collections.singletonMap("key", Short.parseShort("1")), - new Byte[] {Byte.parseByte("1")}, - "string", - Boolean.FALSE, - Byte.parseByte("1"), - Short.parseShort("1"), - Integer.parseInt("1"), - Long.parseLong("1"), - Float.parseFloat("1.1"), - Double.parseDouble("1.1"), - BigDecimal.valueOf(11, 1), - "test".getBytes(), - LocalDate.of(2024, 1, 1), - LocalDateTime.of(2024, 1, 1, 12, 59, 23) - }); - ProducerRecord producerRecord = converter.convert(row); - producer.send(producerRecord); + try { + for (int i = start; i < end; i++) { + SeaTunnelRow row = + new SeaTunnelRow( + new Object[] { + Long.valueOf(i), + Collections.singletonMap("key", Short.parseShort("1")), + new Byte[] {Byte.parseByte("1")}, + "string", + Boolean.FALSE, + Byte.parseByte("1"), + Short.parseShort("1"), + Integer.parseInt("1"), + Long.parseLong("1"), + Float.parseFloat("1.1"), + Double.parseDouble("1.1"), + BigDecimal.valueOf(11, 1), + "test".getBytes(), + LocalDate.of(2024, 1, 1), + LocalDateTime.of(2024, 1, 1, 12, 59, 23) + }); + ProducerRecord producerRecord = converter.convert(row); + producer.send(producerRecord).get(); + } + } catch (Exception e) { + throw new RuntimeException(e); } + producer.flush(); } private static final SeaTunnelRowType SEATUNNEL_ROW_TYPE = diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_assert.conf index 31fe77a3e24..755a9a2b8d5 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_assert.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_assert.conf @@ -32,7 +32,7 @@ source { bootstrap.servers = "kafkaCluster:9092" topic = "test_avro_topic" result_table_name = "kafka_table" - kafka.auto.offset.reset = "earliest" + start_mode = "earliest" format = avro format_error_handle_way = skip schema = { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/jsonFormatIT/kafka_source_json_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/jsonFormatIT/kafka_source_json_to_console.conf index f9a41e7987d..3657390602e 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/jsonFormatIT/kafka_source_json_to_console.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/jsonFormatIT/kafka_source_json_to_console.conf @@ -32,7 +32,7 @@ source { bootstrap.servers = "kafkaCluster:9092" topic = "test_topic_json" result_table_name = "kafka_table" - kafka.auto.offset.reset = "earliest" + start_mode = "earliest" format_error_handle_way = skip schema = { fields { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafka_source_to_assert_with_max_poll_records_1.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafka_source_to_assert_with_max_poll_records_1.conf new file mode 100644 index 00000000000..787858e229f --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafka_source_to_assert_with_max_poll_records_1.conf @@ -0,0 +1,160 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + + # You can set spark configuration here + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + Kafka { + bootstrap.servers = "kafkaCluster:9092" + topic = "test_topic_text_max_poll_records_1" + result_table_name = "kafka_table" + start_mode = "earliest" + format_error_handle_way = fail + kafka.config = { + max.poll.records = 1 + } + schema = { + columns = [ + { + name = id + type = bigint + } + { + name = c_map + type = "map" + } + { + name = c_array + type = "array" + } + { + name = c_string + type = "string" + } + { + name = c_boolean + type = "boolean" + } + { + name = c_tinyint + type = "tinyint" + } + { + name = c_smallint + type = "smallint" + } + { + name = c_int + type = "int" + } + { + name = c_bigint + type = "bigint" + } + { + name = c_float + type = "float" + } + { + name = c_double + type = "double" + } + { + name = c_decimal + type = "decimal(2, 1)" + } + { + name = c_bytes + type = "bytes" + } + { + name = c_date + type = "date" + } + { + name = c_timestamp + type = "timestamp" + } + ] + primaryKey = { + name = "primary key" + columnNames = ["id"] + } + constraintKeys = [ + { + constraintName = "unique_c_string" + constraintType = UNIQUE_KEY + constraintColumns = [ + { + columnName = "c_string" + sortType = ASC + } + ] + } + ] + } + format = text + field_delimiter = "," + } +} + +sink { + console { + source_table_name = "kafka_table" + } + Assert { + source_table_name = "kafka_table" + rules = + { + field_rules = [ + { + field_name = id + field_type = bigint + field_value = [ + { + rule_type = NOT_NULL + }, + { + rule_type = MIN + rule_value = 0 + }, + { + rule_type = MAX + rule_value = 99 + } + ] + } + ] + row_rules = [ + { + rule_type = MIN_ROW + rule_value = 100 + } + ] + } + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf index b6db50989a3..d2a0f05354d 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf @@ -37,7 +37,6 @@ source { result_table_name = "kafka_table" start_mode = "earliest" format_error_handle_way = fail - # kafka.auto.offset.reset = "earliest" schema = { fields { id = bigint diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf index 45b29d19154..88b6098b5e5 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf @@ -37,7 +37,6 @@ source { result_table_name = "kafka_table" start_mode = "earliest" format_error_handle_way = skip - # kafka.auto.offset.reset = "earliest" schema = { fields { id = bigint diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console.conf index 36f01c0337c..3ce077bd589 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console.conf @@ -32,7 +32,7 @@ source { bootstrap.servers = "kafkaCluster:9092" topic = "test_topic_text" result_table_name = "kafka_table" - kafka.auto.offset.reset = "earliest" + start_mode = "earliest" format_error_handle_way = fail schema = { fields { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf index d7f875272b0..132829e3244 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf @@ -32,7 +32,7 @@ source { bootstrap.servers = "kafkaCluster:9092" topic = "test_topic_text" result_table_name = "kafka_table" - kafka.auto.offset.reset = "earliest" + start_mode = "earliest" format_error_handle_way = fail schema = { columns = [ diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemoryAggregatedCommitter.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemoryAggregatedCommitter.java index adc4c1cf2f2..e63175c4f6e 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemoryAggregatedCommitter.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemoryAggregatedCommitter.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.e2e.sink.inmemory; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.sink.MultiTableResourceManager; import org.apache.seatunnel.api.sink.SinkAggregatedCommitter; import org.apache.seatunnel.api.sink.SupportMultiTableSinkAggregatedCommitter; @@ -32,6 +33,11 @@ public class InMemoryAggregatedCommitter private static final List events = new ArrayList<>(); private static final List resourceManagers = new ArrayList<>(); + private ReadonlyConfig config; + + public InMemoryAggregatedCommitter(ReadonlyConfig config) { + this.config = config; + } public static List getEvents() { return events; @@ -62,6 +68,9 @@ public void setMultiTableResourceManager( @Override public List commit( List aggregatedCommitInfo) throws IOException { + if (config.get(InMemorySinkFactory.THROW_EXCEPTION_OF_COMMITTER)) { + throw new IOException("commit failed"); + } return new ArrayList<>(); } diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySink.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySink.java index 8f1eba9af47..9e3852fb3cf 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySink.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySink.java @@ -68,7 +68,7 @@ public Optional> getCommitInfoSerializer() { @Override public Optional> createAggregatedCommitter() throws IOException { - return Optional.of(new InMemoryAggregatedCommitter()); + return Optional.of(new InMemoryAggregatedCommitter(config)); } @Override diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java index 7b06ec99d97..1ab973652f9 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java @@ -41,6 +41,9 @@ public class InMemorySinkFactory public static final Option CHECKPOINT_SLEEP = Options.key("checkpoint_sleep").booleanType().defaultValue(false); + public static final Option THROW_EXCEPTION_OF_COMMITTER = + Options.key("throw_exception_of_committer").booleanType().defaultValue(false); + @Override public String factoryIdentifier() { return "InMemory"; @@ -49,7 +52,11 @@ public String factoryIdentifier() { @Override public OptionRule optionRule() { return OptionRule.builder() - .optional(THROW_EXCEPTION, THROW_OUT_OF_MEMORY, CHECKPOINT_SLEEP) + .optional( + THROW_EXCEPTION, + THROW_OUT_OF_MEMORY, + CHECKPOINT_SLEEP, + THROW_EXCEPTION_OF_COMMITTER) .build(); } diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/UserVariableIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/UserVariableIT.java index 03455af2b07..87e05821b1f 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/UserVariableIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/UserVariableIT.java @@ -34,7 +34,7 @@ public class UserVariableIT extends TestSuiteBase { public void userVariableTest(TestContainer container) throws IOException, InterruptedException { List variables = new ArrayList<>(); String list = "[abc,def]"; - variables.add("resName=fake"); + variables.add("resName=a$(date +\"%Y%m%d\")"); variables.add("rowNum=10"); variables.add("strTemplate=" + list); variables.add("nameType=string"); @@ -42,6 +42,6 @@ public void userVariableTest(TestContainer container) throws IOException, Interr variables.add("sourceTableName=sql"); Container.ExecResult execResult = container.executeJob("/fake_to_console.variables.conf", variables); - Assertions.assertEquals(0, execResult.getExitCode()); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); } } diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/fake_to_console.variables.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/fake_to_console.variables.conf index 48f7ec548bf..41f5bbc77b0 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/fake_to_console.variables.conf +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/fake_to_console.variables.conf @@ -46,7 +46,7 @@ transform { # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, # please go to https://seatunnel.apache.org/docs/category/transform-v2 sql { - source_table_name = "fake" + source_table_name = ${resName} query = "select * from "${resName}" where name = '"${nameVal}"' " result_table_name = "sql" } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java index 2b7498f4856..8735048eac3 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java @@ -293,6 +293,9 @@ private void handleCoordinatorError(CheckpointCloseReason reason, Throwable e) { private void restoreTaskState(TaskLocation taskLocation) { List states = new ArrayList<>(); if (latestCompletedCheckpoint != null) { + if (!latestCompletedCheckpoint.isRestored()) { + latestCompletedCheckpoint.setRestored(true); + } final Integer currentParallelism = pipelineTasks.get(taskLocation.getTaskVertexId()); plan.getSubtaskActions() .get(taskLocation) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java index 74be7952051..7865b9c4dc2 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java @@ -46,7 +46,7 @@ public class CompletedCheckpoint implements Checkpoint, Serializable { private final Map taskStatistics; - @Getter @Setter private boolean isRestored = false; + @Getter @Setter private volatile boolean isRestored = false; public CompletedCheckpoint( long jobId, diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointErrorRestoreEndTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointErrorRestoreEndTest.java new file mode 100644 index 00000000000..4893bd2c2b5 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointErrorRestoreEndTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.checkpoint; + +import org.apache.seatunnel.engine.core.job.JobStatus; +import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest; +import org.apache.seatunnel.engine.server.master.JobMaster; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; + +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.await; + +@DisabledOnOs(OS.WINDOWS) +public class CheckpointErrorRestoreEndTest + extends AbstractSeaTunnelServerTest { + public static String STREAM_CONF_WITH_ERROR_PATH = + "batch_fakesource_to_inmemory_with_commit_error.conf"; + + @Test + public void testCheckpointRestoreToFailEnd() { + long jobId = System.currentTimeMillis(); + startJob(jobId, STREAM_CONF_WITH_ERROR_PATH, false); + + JobMaster jobMaster = server.getCoordinatorService().getJobMaster(jobId); + Assertions.assertEquals(1, jobMaster.getPhysicalPlan().getPipelineList().size()); + await().atMost(120, TimeUnit.SECONDS) + .untilAsserted( + () -> + Assertions.assertEquals( + 3, + jobMaster + .getPhysicalPlan() + .getPipelineList() + .get(0) + .getPipelineRestoreNum())); + await().atMost(120, TimeUnit.SECONDS) + .untilAsserted( + () -> + Assertions.assertEquals( + server.getCoordinatorService().getJobStatus(jobId), + JobStatus.FAILED)); + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_inmemory_with_commit_error.conf b/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_inmemory_with_commit_error.conf new file mode 100644 index 00000000000..b89ee138e27 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_inmemory_with_commit_error.conf @@ -0,0 +1,52 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + parallelism = 2 + job.mode = "BATCH" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + result_table_name = "fake" + row.num = 100 + split.num = 5 + split.read-interval = 3000 + parallelism = 1 + schema = { + fields { + name = "string" + age = "int" + } + } + parallelism = 1 + } +} + +transform { +} + +sink { + InMemory { + source_table_name="fake" + throw_exception_of_committer=true + } +} \ No newline at end of file