diff --git a/docs/zh/connector-v2/sink/ClickhouseFile.md b/docs/zh/connector-v2/sink/ClickhouseFile.md new file mode 100644 index 00000000000..b36a2982f53 --- /dev/null +++ b/docs/zh/connector-v2/sink/ClickhouseFile.md @@ -0,0 +1,138 @@ +# ClickhouseFile + +> Clickhouse文件数据接收器 + +## 描述 + +该接收器使用clickhouse-local程序生成clickhouse数据文件,随后将其发送至clickhouse服务器,这个过程也称为bulkload。该接收器仅支持表引擎为 'Distributed'的表,且`internal_replication`选项需要设置为`true`。支持批和流两种模式。 + +## 主要特性 + +- [ ] [精准一次](../../concept/connector-v2-features.md) + +:::小提示 + +你也可以采用JDBC的方式将数据写入Clickhouse。 + +::: + +## 接收器选项 + +| 名称 | 类型 | 是否必须 | 默认值 | +|------------------------|---------|------|----------------------------------------| +| host | string | yes | - | +| database | string | yes | - | +| table | string | yes | - | +| username | string | yes | - | +| password | string | yes | - | +| clickhouse_local_path | string | yes | - | +| sharding_key | string | no | - | +| copy_method | string | no | scp | +| node_free_password | boolean | no | false | +| node_pass | list | no | - | +| node_pass.node_address | string | no | - | +| node_pass.username | string | no | "root" | +| node_pass.password | string | no | - | +| compatible_mode | boolean | no | false | +| file_fields_delimiter | string | no | "\t" | +| file_temp_path | string | no | "/tmp/seatunnel/clickhouse-local/file" | +| common-options | | no | - | + +### host [string] + +`ClickHouse`集群地址,格式为`host:port`,允许同时指定多个`hosts`。例如`"host1:8123,host2:8123"`。 + +### database [string] + +`ClickHouse`数据库名。 + +### table [string] + +表名称。 + +### username [string] + +连接`ClickHouse`的用户名。 + +### password [string] + +连接`ClickHouse`的用户密码。 + +### sharding_key [string] + +当ClickhouseFile需要拆分数据时,需要考虑的问题是当前数据需要发往哪个节点,默认情况下采用的是随机算法,我们也可以使用'sharding_key'参数为某字段指定对应的分片算法。 + +### clickhouse_local_path [string] + +在spark节点上的clickhouse-local程序路径。由于每个任务都会被调用,所以每个spark节点上的clickhouse-local程序路径必须相同。 + +### copy_method [string] + +为文件传输指定方法,默认为scp,可选值为scp和rsync。 + +### node_free_password [boolean] + +由于seatunnel需要使用scp或者rsync进行文件传输,因此seatunnel需要clickhouse服务端访问权限。如果每个spark节点与clickhouse服务端都配置了免密登录,则可以将此选项配置为true,否则需要在node_pass参数中配置对应节点的密码。 + +### node_pass [list] + +用来保存所有clickhouse服务器地址及其对应的访问密码。 + +### node_pass.node_address [string] + +clickhouse服务器节点地址。 + +### node_pass.username [string] + +clickhouse服务器节点用户名,默认为root。 + +### node_pass.password [string] + +clickhouse服务器节点的访问密码。 + +### compatible_mode [boolean] + +在低版本的Clickhouse中,clickhouse-local程序不支持`--path`参数,需要设置该参数来采用其他方式实现`--path`参数功能。 + +### file_fields_delimiter [string] + +ClickHouseFile使用CSV格式来临时保存数据。但如果数据中包含CSV的分隔符,可能会导致程序异常。使用此配置可以避免该情况。配置的值必须正好为一个字符的长度。 + +### file_temp_path [string] + +ClickhouseFile本地存储临时文件的目录。 + +### common options + +Sink插件常用参数,请参考[Sink常用选项](common-options.md)获取更多细节信息。 + +## 示例 + +```hocon +ClickhouseFile { + host = "192.168.0.1:8123" + database = "default" + table = "fake_all" + username = "default" + password = "" + clickhouse_local_path = "/Users/seatunnel/Tool/clickhouse local" + sharding_key = "age" + node_free_password = false + node_pass = [{ + node_address = "192.168.0.1" + password = "seatunnel" + }] +} +``` + +## 变更日志 + +### 2.2.0-beta 2022-09-26 + +- 支持将数据写入ClickHouse文件并迁移到ClickHouse数据目录 + +### 随后版本 + +- [BugFix] 修复生成的数据部分名称冲突BUG并改进文件提交逻辑 [3416](https://github.com/apache/seatunnel/pull/3416) +- [Feature] 支持compatible_mode来兼容低版本的Clickhouse [3416](https://github.com/apache/seatunnel/pull/3416) + diff --git a/docs/zh/connector-v2/sink/Phoenix.md b/docs/zh/connector-v2/sink/Phoenix.md new file mode 100644 index 00000000000..9a3adc14e5c --- /dev/null +++ b/docs/zh/connector-v2/sink/Phoenix.md @@ -0,0 +1,63 @@ +# Phoenix + +> Phoenix 数据接收器 + +## 描述 + +该接收器是通过 [Jdbc数据连接器](Jdbc.md)来写Phoenix数据,支持批和流两种模式。测试的Phoenix版本为4.xx和5.xx。 +在底层实现上,通过Phoenix的jdbc驱动,执行upsert语句向HBase写入数据。 +使用Java JDBC连接Phoenix有两种方式:其一是使用JDBC连接zookeeper,其二是通过JDBC瘦客户端连接查询服务器。 + +> 提示1: 该接收器默认使用的是(thin)驱动jar包。如果需要使用(thick)驱动或者其他版本的Phoenix(thin)驱动,需要重新编译jdbc数据接收器模块。 +> +> 提示2: 该接收器还不支持精准一次语义(因为Phoenix还不支持XA事务)。 + +## 主要特性 + +- [ ] [精准一次](../../concept/connector-v2-features.md) + +## 接收器选项 + +### driver [string] + +phoenix(thick)驱动:`org.apache.phoenix.jdbc.PhoenixDriver` +phoenix(thin)驱动:`org.apache.phoenix.queryserver.client.Driver` + +### url [string] + +phoenix(thick)驱动:`jdbc:phoenix:localhost:2182/hbase` +phoenix(thin)驱动:`jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF` + +### common options + +Sink插件常用参数,请参考[Sink常用选项](common-options.md)获取更多细节信息。 + +## 示例 + +thick驱动: + +``` + Jdbc { + driver = org.apache.phoenix.jdbc.PhoenixDriver + url = "jdbc:phoenix:localhost:2182/hbase" + query = "upsert into test.sink(age, name) values(?, ?)" + } + +``` + +thin驱动: + +``` +Jdbc { + driver = org.apache.phoenix.queryserver.client.Driver + url = "jdbc:phoenix:thin:url=http://spark_e2e_phoenix_sink:8765;serialization=PROTOBUF" + query = "upsert into test.sink(age, name) values(?, ?)" +} +``` + +## 变更日志 + +### 2.2.0-beta 2022-09-26 + +- 增加Phoenix数据接收器 + diff --git a/docs/zh/connector-v2/sink/Rabbitmq.md b/docs/zh/connector-v2/sink/Rabbitmq.md new file mode 100644 index 00000000000..6562dd2fdc5 --- /dev/null +++ b/docs/zh/connector-v2/sink/Rabbitmq.md @@ -0,0 +1,122 @@ +# Rabbitmq + +> Rabbitmq 数据接收器 + +## 描述 + +该数据接收器是将数据写入Rabbitmq。 + +## 主要特性 + +- [ ] [精准一次](../../concept/connector-v2-features.md) + +## 接收器选项 + +| 名称 | 类型 | 是否必须 | 默认值 | +|----------------------------|---------|------|-------| +| host | string | yes | - | +| port | int | yes | - | +| virtual_host | string | yes | - | +| username | string | yes | - | +| password | string | yes | - | +| queue_name | string | yes | - | +| url | string | no | - | +| network_recovery_interval | int | no | - | +| topology_recovery_enabled | boolean | no | - | +| automatic_recovery_enabled | boolean | no | - | +| use_correlation_id | boolean | no | false | +| connection_timeout | int | no | - | +| rabbitmq.config | map | no | - | +| common-options | | no | - | + +### host [string] + +Rabbitmq服务器地址 + +### port [int] + +Rabbitmq服务器端口 + +### virtual_host [string] + +virtual host – 连接broker使用的vhost + +### username [string] + +连接broker时使用的用户名 + +### password [string] + +连接broker时使用的密码 + +### url [string] + +设置host、port、username、password和virtual host的简便方式。 + +### queue_name [string] + +数据写入的队列名。 + +### schema [Config] + +#### fields [Config] + +上游数据的模式字段。 + +### network_recovery_interval [int] + +自动恢复需等待多长时间才尝试重连,单位为毫秒。 + +### topology_recovery_enabled [boolean] + +设置为true,表示启用拓扑恢复。 + +### automatic_recovery_enabled [boolean] + +设置为true,表示启用连接恢复。 + +### use_correlation_id [boolean] + +接收到的消息是否都提供唯一ID,来删除重复的消息达到幂等(在失败的情况下) + +### connection_timeout [int] + +TCP连接建立的超时时间,单位为毫秒;0代表不限制。 + +### rabbitmq.config [map] + +In addition to the above parameters that must be specified by the RabbitMQ client, the user can also specify multiple non-mandatory parameters for the client, covering [all the parameters specified in the official RabbitMQ document](https://www.rabbitmq.com/configure.html). +除了上面提及必须设置的RabbitMQ客户端参数,你也还可以为客户端指定多个非强制参数,参见 [RabbitMQ官方文档参数设置](https://www.rabbitmq.com/configure.html)。 + +### common options + +Sink插件常用参数,请参考[Sink常用选项](common-options.md)获取更多细节信息。 + +## 示例 + +simple: + +```hocon +sink { + RabbitMQ { + host = "rabbitmq-e2e" + port = 5672 + virtual_host = "/" + username = "guest" + password = "guest" + queue_name = "test1" + rabbitmq.config = { + requested-heartbeat = 10 + connection-timeout = 10 + } + } +} +``` + +## 变更日志 + +### 随后版本 + +- 增加Rabbitmq数据接收器 +- [Improve] 将连接器自定义配置前缀的数据类型更改为Map [3719](https://github.com/apache/seatunnel/pull/3719) + diff --git a/docs/zh/connector-v2/sink/StarRocks.md b/docs/zh/connector-v2/sink/StarRocks.md new file mode 100644 index 00000000000..6be7ff7e8e0 --- /dev/null +++ b/docs/zh/connector-v2/sink/StarRocks.md @@ -0,0 +1,288 @@ +# StarRocks + +> StarRocks 数据接收器 + +## 引擎支持 + +> Spark
+> Flink
+> SeaTunnel Zeta
+ +## 主要特性 + +- [ ] [精准一次](../../concept/connector-v2-features.md) +- [x] [cdc](../../concept/connector-v2-features.md) + +## 描述 + +该接收器用于将数据写入到StarRocks中。支持批和流两种模式。 +StarRocks数据接收器内部实现采用了缓存,通过stream load将数据批导入。 + +## 接收器选项 + +| 名称 | 类型 | 是否必须 | 默认值 | Description | +|-----------------------------|---------|------|------------------------------|---------------------------------------------------------------------------------------------------------------------| +| nodeUrls | list | yes | - | `StarRocks`集群地址, 格式为 `["fe_ip:fe_http_port", ...]` | +| base-url | string | yes | - | JDBC URL样式的连接信息。如:`jdbc:mysql://localhost:9030/` 或 `jdbc:mysql://localhost:9030` 或 `jdbc:mysql://localhost:9030/db` | +| username | string | yes | - | 目标`StarRocks` 用户名 | +| password | string | yes | - | 目标`StarRocks` 密码 | +| database | string | yes | - | 指定目标 StarRocks 表所在的数据库的名称 | +| table | string | no | - | 指定目标 StarRocks 表的名称, 如果没有设置该值,则表名与上游表名相同 | +| labelPrefix | string | no | - | StarRocks stream load作业标签前缀 | +| batch_max_rows | long | no | 1024 | 在批写情况下,当缓冲区数量达到`batch_max_rows`数量或`batch_max_bytes`字节大小或者时间达到`checkpoint.interval`时,数据会被刷新到StarRocks | +| batch_max_bytes | int | no | 5 * 1024 * 1024 | 在批写情况下,当缓冲区数量达到`batch_max_rows`数量或`batch_max_bytes`字节大小或者时间达到`checkpoint.interval`时,数据会被刷新到StarRocks | +| max_retries | int | no | - | 数据写入StarRocks失败后的重试次数 | +| retry_backoff_multiplier_ms | int | no | - | 用作生成下一个退避延迟的乘数 | +| max_retry_backoff_ms | int | no | - | 向StarRocks发送重试请求之前的等待时长 | +| enable_upsert_delete | boolean | no | false | 是否开启upsert/delete事件的同步,仅仅支持主键模型的表 | +| save_mode_create_template | string | no | 参见表下方的说明 | 参见表下方的说明 | +| starrocks.config | map | no | - | stream load `data_desc`参数 | +| http_socket_timeout_ms | int | no | 180000 | http socket超时时间,默认为3分钟 | +| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | 在同步任务打开之前,针对目标端已存在的表结构选择不同的处理方法 | +| data_save_mode | Enum | no | APPEND_DATA | 在同步任务打开之前,针对目标端已存在的数据选择不同的处理方法 | +| custom_sql | String | no | - | 当data_save_mode设置为CUSTOM_PROCESSING时,必须同时设置CUSTOM_SQL参数。CUSTOM_SQL的值为可执行的SQL语句,在同步任务开启前SQL将会被执行 | + +### save_mode_create_template + +StarRocks数据接收器使用模板,在需求需要的时候也可以修改模板,并结合上游数据类型和结构生成表的创建语句来自动创建StarRocks表。当前仅在多表模式下有效。 + +默认模板如下: + +```sql +CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` ( +${rowtype_primary_key}, +${rowtype_fields} +) ENGINE=OLAP +PRIMARY KEY (${rowtype_primary_key}) +DISTRIBUTED BY HASH (${rowtype_primary_key})PROPERTIES ( +"replication_num" = "1" +) +``` + +在模板中添加自定义字段,比如说加上`id`字段的修改模板如下: + +```sql +CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` +( + id, + ${rowtype_fields} +) ENGINE = OLAP DISTRIBUTED BY HASH (${rowtype_primary_key}) + PROPERTIES +( + "replication_num" = "1" +); +``` + +StarRocks数据接收器根据上游数据自动获取相应的信息来填充模板,并且会移除`rowtype_fields`中的id字段信息。使用此方法可用来为自定义字段修改类型及相关属性。 + +可以使用的占位符有: + +- database: 上游数据模式的库名称 +- table_name: 上游数据模式的表名称 +- rowtype_fields: 上游数据模式的所有字段信息,连接器会将字段信息自动映射到StarRocks对应的类型 +- rowtype_primary_key: 上游数据模式的主键信息,结果可能是列表 +- rowtype_unique_key: 上游数据模式的唯一键信息,结果可能是列表 + +### table [string] + +使用选项参数`database`和`table-name`自动生成SQL,并接收上游输入数据写入StarRocks中。 + +此选项与 `query` 是互斥的,具具有更高的优先级。 + +table选项参数可以填入一任意表名,这个名字最终会被用作目标表的表名,并且支持变量(`${table_name}`,`${schema_name}`)。 +替换规则如下:`${schema_name}` 将替换传递给目标端的 SCHEMA 名称,`${table_name}` 将替换传递给目标端的表名。 + +例如: +1. test_${schema_name}_${table_name}_test +2. sink_sinktable +3. ss_${table_name} + +### schema_save_mode[Enum] + +在同步任务打开之前,针对目标端已存在的表结构选择不同的处理方法。可选值有: +`RECREATE_SCHEMA` :不存在的表会直接创建,已存在的表会删除并根据参数重新创建 +`CREATE_SCHEMA_WHEN_NOT_EXIST` :忽略已存在的表,不存在的表会直接创建 +`ERROR_WHEN_SCHEMA_NOT_EXIST` :当有不存在的表时会直接报错 + +### data_save_mode[Enum] + +在同步任务打开之前,针对目标端已存在的数据选择不同的处理方法。可选值有: +`DROP_DATA`: 保存数据库结构,但是会删除表中存量数据 +`APPEND_DATA`:保存数据库结构和相关的表存量数据 +`CUSTOM_PROCESSING`:自定义处理 +`ERROR_WHEN_DATA_EXISTS`:当对应表存在数据时直接报错 + +### custom_sql[String] + +当data_save_mode设置为CUSTOM_PROCESSING时,必须同时设置CUSTOM_SQL参数。CUSTOM_SQL的值为可执行的SQL语句,在同步任务开启前SQL将会被执行。 + +## 数据类型映射 + +| StarRocks数据类型 | SeaTunnel数据类型 | +|---------------|---------------| +| BOOLEAN | BOOLEAN | +| TINYINT | TINYINT | +| SMALLINT | SMALLINT | +| INT | INT | +| BIGINT | BIGINT | +| FLOAT | FLOAT | +| DOUBLE | DOUBLE | +| DECIMAL | DECIMAL | +| DATE | STRING | +| TIME | STRING | +| DATETIME | STRING | +| STRING | STRING | +| ARRAY | STRING | +| MAP | STRING | +| BYTES | STRING | + +#### 支持导入的数据格式 + +StarRocks数据接收器支持的格式有CSV和JSON格式。 + +## 任务示例 + +### 简单示例 + +> 接下来给出一个示例,该示例包含多种数据类型的数据写入,且用户需要为目标端下游创建相应表 + +```hocon +env { + parallelism = 1 + job.mode = "BATCH" + checkpoint.interval = 10000 +} + +source { + FakeSource { + row.num = 10 + map.size = 10 + array.size = 10 + bytes.length = 10 + string.length = 10 + schema = { + fields { + c_map = "map>" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(16, 1)" + c_null = "null" + c_bytes = bytes + c_date = date + c_timestamp = timestamp + } + } + } +} + +sink { + StarRocks { + nodeUrls = ["e2e_starRocksdb:8030"] + username = root + password = "" + database = "test" + table = "e2e_table_sink" + batch_max_rows = 10 + starrocks.config = { + format = "JSON" + strip_outer_array = true + } + } +} +``` + +### 支持写入cdc变更事件(INSERT/UPDATE/DELETE)示例 + +```hocon +sink { + StarRocks { + nodeUrls = ["e2e_starRocksdb:8030"] + username = root + password = "" + database = "test" + table = "e2e_table_sink" + ... + + // 支持upsert/delete事件的同步(需要将选项参数enable_upsert_delete设置为true),仅支持表引擎为主键模型 + enable_upsert_delete = true + } +} +``` + +### JSON格式数据导入示例 + +``` +sink { + StarRocks { + nodeUrls = ["e2e_starRocksdb:8030"] + username = root + password = "" + database = "test" + table = "e2e_table_sink" + batch_max_rows = 10 + starrocks.config = { + format = "JSON" + strip_outer_array = true + } + } +} + +``` + +### CSV格式数据导入示例 + +``` +sink { + StarRocks { + nodeUrls = ["e2e_starRocksdb:8030"] + username = root + password = "" + database = "test" + table = "e2e_table_sink" + batch_max_rows = 10 + starrocks.config = { + format = "CSV" + column_separator = "\\x01" + row_delimiter = "\\x02" + } + } +} +``` + +### 使用save_mode的示例 + +``` +sink { + StarRocks { + nodeUrls = ["e2e_starRocksdb:8030"] + username = root + password = "" + database = "test" + table = "test_${schema_name}_${table_name}" + schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" + data_save_mode="APPEND_DATA" + batch_max_rows = 10 + starrocks.config = { + format = "CSV" + column_separator = "\\x01" + row_delimiter = "\\x02" + } + } +} +``` + +## 变更日志 + +### 随后版本 + +- 增加StarRocks数据接收器 +- [Improve] 将连接器自定义配置前缀的数据类型更改为Map [3719](https://github.com/apache/seatunnel/pull/3719) +- [Feature] 支持写入cdc变更事件(INSERT/UPDATE/DELETE) [3865](https://github.com/apache/seatunnel/pull/3865) +