Skip to content

Commit

Permalink
Merge branch 'dev' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
SplitfireUptown authored Jan 20, 2025
2 parents c129075 + 141dc16 commit 35bb23c
Show file tree
Hide file tree
Showing 8 changed files with 1,199 additions and 2 deletions.
2 changes: 1 addition & 1 deletion docs/en/connector-v2/sink/Doris.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ The internal implementation of Doris sink connector is cached and imported by st
| table | String | Yes | - | The table name of `Doris` table, use `${table_name}` to represent the upstream table name |
| table.identifier | String | Yes | - | The name of `Doris` table, it will deprecate after version 2.3.5, please use `database` and `table` instead. |
| sink.label-prefix | String | Yes | - | The label prefix used by stream load imports. In the 2pc scenario, global uniqueness is required to ensure the EOS semantics of SeaTunnel. |
| sink.enable-2pc | bool | No | false | Whether to enable two-phase commit (2pc), the default is false. For two-phase commit, please refer to [here](https://doris.apache.org/docs/dev/sql-manual/sql-statements/data-modification/load-and-export/SHOW-STREAM-LOAD). |
| sink.enable-2pc | bool | No | false | Whether to enable two-phase commit (2pc), the default is false. For two-phase commit, please refer to [here](https://doris.apache.org/docs/data-operate/transaction?_highlight=two&_highlight=phase#stream-load-2pc). |
| sink.enable-delete | bool | No | - | Whether to enable deletion. This option requires Doris table to enable batch delete function (0.15+ version is enabled by default), and only supports Unique model. you can get more detail at this [link](https://doris.apache.org/docs/dev/data-operate/delete/batch-delete-manual/) |
| sink.check-interval | int | No | 10000 | check exception with the interval while loading |
| sink.max-retries | int | No | 3 | the max retry times if writing records to database failed |
Expand Down
2 changes: 1 addition & 1 deletion docs/zh/connector-v2/sink/Doris.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ Doris Sink连接器的内部实现是通过stream load批量缓存和导入的
| table | String | Yes | - | `Doris` 表名, 使用 `${table_name}` 表示上游表名。 |
| table.identifier | String | Yes | - | `Doris` 表的名称,2.3.5 版本后将弃用,请使用 `database``table` 代替。 |
| sink.label-prefix | String | Yes | - | stream load导入使用的标签前缀。 在2pc场景下,需要全局唯一性来保证SeaTunnel的EOS语义。 |
| sink.enable-2pc | bool | No | false | 是否启用两阶段提交(2pc),默认为 false。 对于两阶段提交,请参考[此处](https://doris.apache.org/docs/dev/sql-manual/sql-statements/data-modification/load-and-export/SHOW-STREAM-LOAD)|
| sink.enable-2pc | bool | No | false | 是否启用两阶段提交(2pc),默认为 false。 对于两阶段提交,请参考[此处](https://doris.apache.org/docs/data-operate/transaction?_highlight=two&_highlight=phase#stream-load-2pc)|
| sink.enable-delete | bool | No | - | 是否启用删除。 该选项需要Doris表开启批量删除功能(0.15+版本默认开启),且仅支持Unique模型。 您可以在此[link](https://doris.apache.org/docs/dev/data-operate/delete/batch-delete-manual/)获得更多详细信息 |
| sink.check-interval | int | No | 10000 | 加载过程中检查异常时间间隔。 |
| sink.max-retries | int | No | 3 | 向数据库写入记录失败时的最大重试次数。 |
Expand Down
144 changes: 144 additions & 0 deletions docs/zh/connector-v2/sink/Neo4j.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
# Neo4j

> Neo4j 写连接器
## 描述

写数据到 `Neo4j`

`neo4j-java-driver` version 4.4.9

## 主要功能

- [ ] [精确一次](../../concept/connector-v2-features.md)

## 配置选项

| 名称 | 类型 | 是否必须 | 默认值 |
|----------------------------|---------|------|----------|
| uri | String || - |
| username | String || - |
| password | String || - |
| max_batch_size | Integer || - |
| write_mode | String || OneByOne |
| bearer_token | String || - |
| kerberos_ticket | String || - |
| database | String || - |
| query | String || - |
| queryParamPosition | Object || - |
| max_transaction_retry_time | Long || 30 |
| max_connection_timeout | Long || 30 |
| common-options | config || - |

### uri [string]

`Neo4j`数据库的URI,参考配置: `neo4j://localhost:7687`

### username [string]

`Neo4j`用户名。

### password [string]

`Neo4j`密码。如果提供了“用户名”,则需要。

### max_batch_size[Integer]

`max_batch_size` 是指写入数据时,单个事务中可以写入的最大数据条目数。

### write_mode

默认值为 `oneByOne` ,如果您想批量写入,请将其设置为`Batch`

```cypher
unwind $ttt as row create (n:Label) set n.name = row.name,n.age = rw.age
```

`ttt`代表一批数据。,`ttt`可以是任意字符串,只要它与配置的`batch_data_variable` 匹配。

### bearer_token [string]

`Neo4j``base64`编码`bearer token`用于鉴权。

### kerberos_ticket [string]

`Neo4j``base64`编码`kerberos ticket`用于鉴权。

### database [string]

数据库名称。

### query [string]

查询语句。包含在运行时用相应值替换的参数占位符。

### queryParamPosition [object]

查询参数的位置映射信息。

键名是参数占位符名称。

关联值是字段在输入数据行中的位置。

### max_transaction_retry_time [long]

最大事务重试时间(秒)。如果超过,则交易失败。

### max_connection_timeout [long]

等待TCP连接建立的最长时间(秒)。

### common options

Sink插件常用参数, 详细信息请参考 [Sink公共配置](../sink-common-options.md)

## OneByOne模式写示例

```
sink {
Neo4j {
uri = "neo4j://localhost:7687"
username = "neo4j"
password = "1234"
database = "neo4j"
max_transaction_retry_time = 10
max_connection_timeout = 10
query = "CREATE (a:Person {name: $name, age: $age})"
queryParamPosition = {
name = 0
age = 1
}
}
}
```

## Batch模式写示例
> cypher提供的`unwind`关键字支持批量写入,
> 批量数据的默认变量是batch。如果你写一个批处理写语句,
> 那么你应该声明 cypher `unwind $batch` 作为行
```
sink {
Neo4j {
uri = "bolt://localhost:7687"
username = "neo4j"
password = "neo4j"
database = "neo4j"
max_batch_size = 1000
write_mode = "BATCH"
max_transaction_retry_time = 3
max_connection_timeout = 10
query = "unwind $batch as row create(n:MyLabel) set n.name = row.name,n.age = row.age"
}
}
```

## Changelog

### 2.2.0-beta 2022-09-26

- 添加 Neo4j 写连接器

### issue ##4835

- 写连接器支持批量写入

Loading

0 comments on commit 35bb23c

Please sign in to comment.