diff --git a/directory.json b/directory.json index 296029c..309257b 100644 --- a/directory.json +++ b/directory.json @@ -553,6 +553,10 @@ } ] }, + { + "title": "Neuron", + "path": "streaming-processing/neuron" + }, { "title": "MQTT", "path": "streaming-processing/mqtt" @@ -570,34 +574,33 @@ "path": "streaming-processing/memory" }, { - "title": "Neuron", - "path": "streaming-processing/neuron" + "title": "SQL", + "path": "streaming-processing/sql" }, { "title": "文件", "path": "streaming-processing/file" }, - { - "title": "SQL", - "path": "streaming-processing/sql" - }, { "title": "Video", "path": "streaming-processing/video" - }, - { - "title": "Redis", - "path": "streaming-processing/redis" } ] }, { - "title": "规则管理", - "path": "streaming-processing/rules" - }, - { - "title": "规则管道", - "path": "streaming-processing/rule_pipeline" + "title": "规则", + "path": "streaming-processing/rules", + "collapsed": true, + "children": [ + { + "title": "规则管理", + "path": "streaming-processing/rule_status" + }, + { + "title": "规则流水线", + "path": "streaming-processing/rule_pipeline" + } + ] }, { "title": "动作(Sink)", @@ -623,14 +626,6 @@ "title": "REST", "path": "streaming-processing/sink/rest" }, - { - "title": "Redis", - "path": "streaming-processing/sink/redis" - }, - { - "title": "文件", - "path": "streaming-processing/sink/file" - }, { "title": "内存", "path": "streaming-processing/sink/memory" @@ -639,16 +634,12 @@ "title": "Log", "path": "streaming-processing/sink/log" }, - { - "title": "Nop", - "path": "streaming-processing/sink/nop" - }, { "title": "SQL", "path": "streaming-processing/sink/sql" }, { - "title": "InfluxDB", + "title": "InfluxDB V1", "path": "streaming-processing/sink/influx" }, { @@ -656,8 +647,12 @@ "path": "streaming-processing/sink/influx2" }, { - "title": "Kafka", - "path": "streaming-processing/sink/kafka" + "title": "文件", + "path": "streaming-processing/sink/file" + }, + { + "title": "Nop", + "path": "streaming-processing/sink/nop" } ] } diff --git a/en_US/streaming-processing/file.md b/en_US/streaming-processing/file.md index 1b461c9..2682041 100644 --- a/en_US/streaming-processing/file.md +++ b/en_US/streaming-processing/file.md @@ -68,8 +68,6 @@ ECP Edge 目前支持两种方式上传配置文件:上传文件或者提供 - **时间戳格式**:指定时间戳格式。 - **共享**:勾选确认是否共享源。 -您也可选择通过文本模式进行上述配置,通过 SQL 定义。 - ## 创建扫描表 File 源支持查询表。登录 ECP Edge,点击**数据流处理** -> **源管理**。在**扫描表**页签,点击**创建扫描表**。 @@ -87,15 +85,6 @@ File 源支持查询表。登录 ECP Edge,点击**数据流处理** -> **源 - **保留大小**:指定表快照的大小,用于存储历史数据。 -您也可选择通过文本模式进行配置,通过 SQL 定义,例如 - -```sql -CREATE TABLE table1 ( - name STRING, - size BIGINT, - id BIGINT -) WITH (DATASOURCE="lookup.json", FORMAT="json", TYPE="file"); -``` ## 示例 diff --git a/en_US/streaming-processing/http_pull.md b/en_US/streaming-processing/http_pull.md index db15081..3f53a0a 100644 --- a/en_US/streaming-processing/http_pull.md +++ b/en_US/streaming-processing/http_pull.md @@ -43,7 +43,7 @@ ECP Edge 默认支持 HTTP Pull 源,该支持可从 HTTP 服务器代理提取 - **跳过证书验证**:控制是否跳过证书认证。如设置为 True,将跳过证书认证;否则进行证书验证。 - - **HTTP 标头**: 需要与 HTTP 请求一起发送的 HTTP 请求标头。可通过文本模式或可视化模式进行配置。 + - **HTTP 标头**: 需要与 HTTP 请求一起发送的 HTTP 请求标头。 - **响应类型**: 响应类型,可以是 `code` 或者 `body`,如果是 `code`,那么 ECP Edge 会检查 HTTP 响应码来判断响应状态。如果是 `body`,那么 ECP Edge 会检查 HTTP 响应正文,要求其为 JSON 格式,并且检查 code 字段的值。 @@ -83,8 +83,6 @@ HTTP Pull 源支持查询表。登录 ECP Edge,点击**数据流处理** -> ** - **保留大小**:指定保留大小。 -您也可选择通过文本模式进行配置,通过 SQL 定义。 - ## OAuth 定义类 OAuth 的认证流程。其他的认证方式如 apikey 可以直接在 headers 设置密钥,不需要使用这个配置。 diff --git a/en_US/streaming-processing/http_push.md b/en_US/streaming-processing/http_push.md index 4a8fc35..5ec8301 100644 --- a/en_US/streaming-processing/http_push.md +++ b/en_US/streaming-processing/http_push.md @@ -74,4 +74,3 @@ HTTP Push 源支持查询表。登录 ECP Edge,点击**数据流处理** -> ** - **保留大小**:指定保留大小。 -您也可选择通过文本模式进行配置,通过 SQL 定义。 diff --git a/en_US/streaming-processing/memory.md b/en_US/streaming-processing/memory.md index a2671ef..16777b8 100644 --- a/en_US/streaming-processing/memory.md +++ b/en_US/streaming-processing/memory.md @@ -25,7 +25,6 @@ - **时间戳格式**:指定时间戳格式。 - **共享**:勾选确认是否共享源。 -您也可选择通过文本模式进行配置,通过 SQL 定义。 ## 主题通配符 @@ -56,7 +55,6 @@ - **保留大小**:指定保留大小。 -您也可选择通过文本模式进行配置,通过 SQL 定义。 ## 创建查询表 @@ -77,9 +75,3 @@ - **主键**:指定主键。 -您也可选择通过文本模式进行配置,通过 SQL 定义,例如: - -```sql -CREATE TABLE alertTable() WITH (DATASOURCE="topicName", TYPE="memory", KIND="lookup", KEY="id") -``` - diff --git a/en_US/streaming-processing/mqtt.md b/en_US/streaming-processing/mqtt.md index 343a0c0..36cad7b 100644 --- a/en_US/streaming-processing/mqtt.md +++ b/en_US/streaming-processing/mqtt.md @@ -64,5 +64,3 @@ MQTT 源支持查询表。登录 ECP Edge,点击**数据流处理** -> **源 - 如选择 delimited,还应配置分隔符,如 "," - **保留大小**:指定保留大小。 - -您也可选择通过文本模式进行配置,通过 SQL 定义。 \ No newline at end of file diff --git a/en_US/streaming-processing/neuron.md b/en_US/streaming-processing/neuron.md index 0f1feac..7de70d0 100644 --- a/en_US/streaming-processing/neuron.md +++ b/en_US/streaming-processing/neuron.md @@ -80,4 +80,3 @@ MQTT 源支持查询表。登录 ECP Edge,点击**数据流处理** -> **源 - **保留大小**:指定保留大小。 -您也可选择通过文本模式进行配置,通过 SQL 定义。 \ No newline at end of file diff --git a/en_US/streaming-processing/sink/sink.md b/en_US/streaming-processing/sink/sink.md index f83b6a2..1d4ecf0 100644 --- a/en_US/streaming-processing/sink/sink.md +++ b/en_US/streaming-processing/sink/sink.md @@ -10,7 +10,6 @@ - [Neuron](./neuron.md):输出到本地的 Neuron 实例。 - [EdgeX](./edgex.md):输出到 EdgeX Foundry。此动作仅在启用 edgex 编译标签时存在。 - [Rest](./rest.md):输出到外部 http 服务器。 -- [Redis](./redis.md): 写入 Redis 。 - [File](./file.md): 写入文件。 - [Memory](./memory.md):输出到 eKuiper 内存主题以形成规则管道。 - [Log](./log.md):写入日志,通常只用于调试。 diff --git a/zh_CN/admin/log-management.md b/zh_CN/admin/log-management.md index 257b37d..c9ab169 100644 --- a/zh_CN/admin/log-management.md +++ b/zh_CN/admin/log-management.md @@ -76,7 +76,7 @@ NeuronEX 支持开启/关闭某个驱动节点的 debug 日志,方便用户调 tail -f tail -f modbus-plus-tcp.log ``` -数据处理引擎日志查看命令为 +数据处理模块日志查看命令为 ```shell tail -f /opt/neuronex/software/ekuiper/log/stream.log diff --git a/zh_CN/streaming-processing/_assets/config_sink.png b/zh_CN/streaming-processing/_assets/config_sink.png new file mode 100644 index 0000000..f3c0556 Binary files /dev/null and b/zh_CN/streaming-processing/_assets/config_sink.png differ diff --git a/zh_CN/streaming-processing/_assets/config_source.png b/zh_CN/streaming-processing/_assets/config_source.png new file mode 100644 index 0000000..019a7b2 Binary files /dev/null and b/zh_CN/streaming-processing/_assets/config_source.png differ diff --git a/zh_CN/streaming-processing/_assets/install_sql_source.png b/zh_CN/streaming-processing/_assets/install_sql_source.png deleted file mode 100644 index d49afe2..0000000 Binary files a/zh_CN/streaming-processing/_assets/install_sql_source.png and /dev/null differ diff --git a/zh_CN/streaming-processing/_assets/neuron_dataprocessing.png b/zh_CN/streaming-processing/_assets/neuron_dataprocessing.png new file mode 100644 index 0000000..f9c5643 Binary files /dev/null and b/zh_CN/streaming-processing/_assets/neuron_dataprocessing.png differ diff --git a/zh_CN/streaming-processing/_assets/rule_list.png b/zh_CN/streaming-processing/_assets/rule_list.png new file mode 100644 index 0000000..d07352f Binary files /dev/null and b/zh_CN/streaming-processing/_assets/rule_list.png differ diff --git a/zh_CN/streaming-processing/_assets/source_datastructure.png b/zh_CN/streaming-processing/_assets/source_datastructure.png new file mode 100644 index 0000000..e3ca348 Binary files /dev/null and b/zh_CN/streaming-processing/_assets/source_datastructure.png differ diff --git a/zh_CN/streaming-processing/_assets/source_neuronStream.png b/zh_CN/streaming-processing/_assets/source_neuronStream.png new file mode 100644 index 0000000..b414596 Binary files /dev/null and b/zh_CN/streaming-processing/_assets/source_neuronStream.png differ diff --git a/zh_CN/streaming-processing/config.md b/zh_CN/streaming-processing/config.md index 2ac048f..f926ea5 100644 --- a/zh_CN/streaming-processing/config.md +++ b/zh_CN/streaming-processing/config.md @@ -1,19 +1,22 @@ # 配置 -本页面介绍如何进行资源配置,如配置组和传输与存储模版、连接、模式以及文件管理。 +本页面介绍如何进行资源配置以及模式配置。 ## 资源 -资源包括源配置组和传输与存储模版两部分内容。 +资源包括`源配置组`和`传输与存储模版`两部分内容。 ### 源配置组 每个源都会有自己的配置文件,用户在创建流/表时可同步进行相应配置。您可在**数据流处理** -> **配置** -> **资源** 的**源配置组**页签查看已添加的配置,编辑相关操作,或删除配置组,也可点击右上角的**添加配置组**按钮进行添加。 +config_source ### 传输与存储模版 -每个 Sink (数据汇)对应的传输与存储模版,您可在**数据流处理** -> **配置** -> **资源** 的**传输与存储模版**页签查看已添加的配置,编辑相关操作,或删除配置组,也可点击右上角的**添加传输与存储模版**按钮进行添加。 +每个动作(Sink)对应的传输与存储模版,您可在**数据流处理** -> **配置** -> **资源** 的**传输与存储模版**页签查看已添加的配置,编辑相关操作,或删除配置组,也可点击右上角的**添加传输与存储模版**按钮进行添加。 + +config_sink ## 模式 diff --git a/zh_CN/streaming-processing/file.md b/zh_CN/streaming-processing/file.md index cdc27c5..180d9ce 100644 --- a/zh_CN/streaming-processing/file.md +++ b/zh_CN/streaming-processing/file.md @@ -7,10 +7,9 @@ +NeuronEX 数据处理模块通过 `File` 类型的数据源,可以接收来自文件的数据。File 类型可以作为流、扫描表的数据源,支持监控文件或文件夹。当监控对象为文件夹时,NeuronEX 会按照文件名的字母顺序来读取文件。 -NeuronEX 默认支持 File 类型数据源,File 类型可以作为流、扫描表的数据源,支持监控文件或文件夹。当监控对象为文件夹时,NeuronEX 会按照文件名的字母顺序来读取文件。 ::: tip - 如果被监控的位置是一个文件夹,该文件夹内的文件类型必须相同。 ::: @@ -22,7 +21,7 @@ NeuronEX 默认支持 File 类型数据源,File 类型可以作为流、扫描 有些文件可能有大部分数据是标准格式,但在文件的开头和结尾行有一些元数据。用户可以使用 **文件开头忽略的行数**(`ignoreStartLines`) 和 文件结尾忽略的行数 (`ignoreEndLines`) 参数来删除非标准的开头和结尾的非标准部分,以便解析上述文件类型。 -## 准备文件 + ## 创建流 @@ -41,9 +40,7 @@ NeuronEX 目前支持两种方式上传配置文件:上传文件或者提供 在弹出的**源管理** / **创建**页面,进入如下配置: - **流名称**:输入流名称 -- **是否为带结构的流**:勾选确认是否为带结构的流,如为带结构的流,则需进一步添加流字段 - - **名称**:字段名称 - - **类型**:支持 bigint、float、string、datetime、boolean、array、struct、bytea +- **是否为带结构的流**:勾选确认是否为带结构的流,如为带结构的流,则需进一步添加流字段。可默认不勾选。 - **流类型**:选择 file。 - **数据源**:指定文件或目录的相对地址。注:请输入不含路径的文件名,例如 `my.json`。 - **配置组**:可使用默认配置组,如希望自定义配置组,可点击添加配置组按钮,在弹出的对话框中进行如下设置,设置完成后,可点击**测试连接**进行测试: @@ -59,41 +56,21 @@ NeuronEX 目前支持两种方式上传配置文件:上传文件或者提供 - **文件开头忽略的行数**:忽略文件开头的几行。例如,如果设置为 3,那么文件的前三行将被忽略。 - **文件结尾忽略的行数**: 忽略文件结尾的几行。注意,文件的最后空行不计算在内。 - **流格式**:支持 json、binary、protobuf、delimited、custom。 - - 如选择 protobuf 或 custom,还应配置对应的[模式和模式消息](./config.md#模式) - - 如选择 delimited,还应配置分隔符,如 "," -- **时间戳字段**:指定代表时间的字段。 -- **时间戳格式**:指定时间戳格式。 - **共享**:勾选确认是否共享源。 -您也可选择通过文本模式进行上述配置,通过 SQL 定义。 - ## 创建扫描表 -File 源支持查询表。登录 NeuronEX,点击**数据流处理** -> **源管理**。在**扫描表**页签,点击**创建扫描表**。 +File 源支持扫描表。登录 NeuronEX,点击**数据流处理** -> **源管理**。在**扫描表**页签,点击**创建扫描表**。 - **表名称**:输入表名称 -- **是否为带结构的表**:勾选确认是否为带结构的表,如为带结构的表,则需进一步添加表字段 - - **名称**:字段名称 - - **类型**:支持 bigint、float、string、datetime、boolean、array、struct、bytea +- **是否为带结构的表**:勾选确认是否为带结构的表,如为带结构的表,则需进一步添加表字段。可默认不勾选。 - **表类型**:选择 file。 - **数据源**:指定文件或目录的相对地址。注:请输入不含路径的文件名,例如 test.json。 - **配置组**:可使用默认配置组,如希望自定义配置组,可参考[创建流](#创建流)部分 - **表格式**:支持 json、binary、delimited、custom。 - - 如选择 custom,还应配置对应的[模式和模式消息](./config.md#模式) - - 如选择 delimited,还应配置分隔符,如 "," - - **保留大小**:指定表快照的大小,用于存储历史数据。 -您也可选择通过文本模式进行配置,通过 SQL 定义,例如 - -```sql -CREATE TABLE table1 ( - name STRING, - size BIGINT, - id BIGINT -) WITH (DATASOURCE="lookup.json", FORMAT="json", TYPE="file"); -``` ## 示例 diff --git a/zh_CN/streaming-processing/http_pull.md b/zh_CN/streaming-processing/http_pull.md index 9dd3068..8ce1741 100644 --- a/zh_CN/streaming-processing/http_pull.md +++ b/zh_CN/streaming-processing/http_pull.md @@ -2,7 +2,7 @@ 扫描表 -NeuronEX 默认支持 HTTP Pull 源,该支持可从 HTTP 服务器代理提取消息并输入 NeuronEX,该类型可以作为流、扫描表的数据源。 +NeuronEX 数据处理模块通过 `HTTP Pull` 类型的数据源,可以从 HTTP 服务器获取数据,该类型可以作为流、扫描表的数据源。 ## 创建流 @@ -11,12 +11,9 @@ NeuronEX 默认支持 HTTP Pull 源,该支持可从 HTTP 服务器代理提取 在弹出的**源管理** / **创建**页面,进入如下配置: - **流名称**:输入流名称 -- **是否为带结构的流**:勾选确认是否为带结构的流,如为带结构的流,则需进一步添加流字段 - - - **名称**:字段名称 - - **类型**:支持 bigint、float、string、datetime、boolean、array、struct、bytea +- **是否为带结构的流**:勾选确认是否为带结构的流,如为带结构的流,则需进一步添加流字段。可默认不勾选。 - **流类型**:选择 httppull。 -- **数据源**:指定 URL 的路径部分,与 URL 属性拼接成最终 URL, 例如 /api/data。 +- **数据源(URL拼接路径)**:指定 URL 的路径部分,与配置组中的 `路径` 属性拼接成最终 URL。 例如配置组中的 `路径` 属性添填写为 `http://127.0.0.1:7000`,**数据源(URL拼接路径)** 填写为`/api/data`,则HTTP 完整请求地址为:`http://127.0.0.1:7000/api/data` 。 - **配置组**:可使用默认配置组,如希望自定义配置组,可点击添加配置组按钮,在弹出的对话框中进行如下设置,设置完成后,可点击**测试连接**进行测试: - **名称**:输入配置组名称。 @@ -29,23 +26,23 @@ NeuronEX 默认支持 HTTP Pull 源,该支持可从 HTTP 服务器代理提取 - **超时时间**: HTTP 请求的超时时间,单位为毫秒。 - - **递增**: 如果设置为 True,将会与上一次的结果进行比较;如果连续两次请求的响应相同,则不会发送新的结果。可选值:True/False。 + - **递增**: 如果设置为 True,将会与上一次的结果进行比较;如果连续两次请求的响应相同,则不会发送新的结果。默认值为 False。 - **正文**:请求的正文,例如 `{"data": "data", "method": 1}`。 - **正文类型**: 正文类型,可以是 none、text、json、html、xml、javascript 或 form。 - - **证书类型**:证书路径。可以为绝对路径,也可以为相对路径。如果指定的是相对路径,那么父目录为执行 server 命令的路径。 + - **证书类型**:可选参数,填写证书路径,可以为绝对路径,也可以为相对路径。如果指定的是相对路径,那么父目录为执行 neuronex 命令的路径。示例值:`/var/xyz-certificate.pem` - - **私钥路径**:私钥路径。可以为绝对路径,也可以为相对路径。 + - **私钥路径**:可选参数,可以为绝对路径,也可以为相对路径。示例值:`/var/xyz-private.pem.key` - - **根证书路径**:根证书路径,用以验证服务器证书。可以为绝对路径,也可以为相对路径。 + - **根证书路径**:可选参数,用以验证服务器证书。可以为绝对路径,也可以为相对路径。示例值:`/var/xyz-rootca.pem` - **跳过证书验证**:控制是否跳过证书认证。如设置为 True,将跳过证书认证;否则进行证书验证。 - - **HTTP 标头**: 需要与 HTTP 请求一起发送的 HTTP 请求标头。可通过文本模式或可视化模式进行配置。 + - **HTTP 标头**: 需要与 HTTP 请求一起发送的 HTTP 请求标头。 - - **响应类型**: 响应类型,可以是 `code` 或者 `body`,如果是 `code`,那么 NeuronEX 会检查 HTTP 响应码来判断响应状态。如果是 `body`,那么 NeuronEX 会检查 HTTP 响应正文,要求其为 JSON 格式,并且检查 code 字段的值。 + - **响应类型**: 响应类型,可以是 `code` 或者 `body`,如果是 `code`,那么 NeuronEX 会检查 HTTP 响应码来判断响应状态。如果是 `body`,那么 NeuronEX 会检查 HTTP 响应正文,要求其为 JSON 格式,并且检查 code 字段的值。默认为 `code`。 - **oAuth**: 配置 OAuth 验证流程,关于 OAuth 的详细介绍,见 [OAuth](#oauth) @@ -58,14 +55,57 @@ NeuronEX 默认支持 HTTP Pull 源,该支持可从 HTTP 服务器代理提取 - url:刷新令牌的网址,总是使用 POST 方式请求。 - headers:用于刷新令牌的请求头。通常把令牌放在这里,用于授权。 - body:刷新令牌的请求主体。当使用头文件来传递刷新令牌时,可能不需要配置此选项。 -- **流格式**:支持 json、binary、protobuf、delimited、custom。 - - 如选择 protobuf 或 custom,还应配置对应的[模式和模式消息](./config.md#模式) - - 如选择 delimited,还应配置分隔符,如 "," - -- **时间戳字段**:指定代表时间的字段。 -- **时间戳格式**:指定时间戳格式。 +- **流格式**:选择默认 json 格式。 - **共享**:勾选确认是否共享源。 +示例配置如下: +```yaml + +default: + # 请求服务器地址的URL + url: http://localhost:7000 + # post, get, put, delete + method: post + # 请求之间的间隔,时间单位为 ms + interval: 10000 + # http请求超时,时间单位为 ms + timeout: 5000 + # 如果将其设置为 true,则将与最后的结果进行比较; 如果两个请求的响应相同,则将跳过发送结果。 + # 可能的设置可能是:true/false + incremental: false + # 请求正文,例如'{"data": "data", "method": 1}' + body: '{}' + # 正文类型, none、text、json、html、xml、javascript、form + bodyType: json + # 请求所需的HTTP标头 + headers: + Accept: application/json + # 如何检查响应状态,支持通过状态码或 body + responseType: code + # 获取 token +# oAuth: +# # 设置如何获取访问码 +# access: +# # 获取访问码的 URL,总是使用 POST 方法发送请求 +# url: https://127.0.0.1/api/token +# # 请求正文 +# body: '{"username": "admin","password": "123456"}' +# # 令牌的过期时间,以字符串表示,时间单位为秒,允许使用模板 +# expire: '3600' +# # 如何刷新令牌 +# refresh: +# # 刷新令牌的 URL,总是使用 POST 方法发送请求 +# url: https://127.0.0.1/api/refresh +# # HTTP 请求头,允许使用模板 +# headers: +# identityId: '{{.data.identityId}}' +# token: '{{.data.token}}' +# # 请求正文 +# body: '' + + +``` + ## 创建扫描表 HTTP Pull 源支持查询表。登录 NeuronEX,点击**数据流处理** -> **源管理**。在**扫描表**页签,点击**创建扫描表**。 @@ -75,37 +115,9 @@ HTTP Pull 源支持查询表。登录 NeuronEX,点击**数据流处理** -> ** - **名称**:字段名称 - **类型**:支持 bigint、float、string、datetime、boolean、array、struct、bytea - **表类型**:选择 httppull -- **数据源**:指定 URL 的路径部分,与 URL 属性拼接成最终 URL, 例如 /api/data。 +- **数据源(URL拼接路径)**:指定 URL 的路径部分,与配置组中的 `路径` 属性拼接成最终 URL。 例如配置组中的 `路径` 属性添填写为 `http://127.0.0.1:7000`,**数据源(URL拼接路径)** 填写为`/api/data`,则HTTP 完整请求地址为:`http://127.0.0.1:7000/api/data` 。 - **配置组**:可使用默认配置组,如希望自定义配置组,可参考[创建流](#创建流)部分 - **表格式**:支持 json、binary、delimited、custom。 - - 如选择 custom,还应配置对应的[模式和模式消息](./config.md#模式) - - 如选择 delimited,还应配置分隔符,如 "," - **保留大小**:指定保留大小。 -您也可选择通过文本模式进行配置,通过 SQL 定义。 - -## OAuth - -定义类 OAuth 的认证流程。其他的认证方式如 apikey 可以直接在 headers 设置密钥,不需要使用这个配置。 - -OAuth 2.0 是一个授权协议,让 API 客户端有限度地访问网络服务器上的用户数据。oAuth 最常见的流程是授权代码,大多用于服务器端和移动网络应用。在这个流程中,用户使用他们的账户登录到网络应用中,认证码会返回给应用。之后,应用程序可以使用认证码来请求访问令牌,并可能在到期后通过刷新令牌来刷新令牌。 - -在这个配置中,我们假设认证码已经获取了,用户只需指定令牌申请的过程,该过程可能需要该认证码或只是密码(OAuth 的变体)。 - -需要配置两个部分:用于获取访问代码的 access 配置和用于令牌刷新的 refresh 配置。其中,refresh 配置是可选的,只有存在单独的刷新流程时才需要配置。 - -### access - -- url:获取访问码的网址,总是使用 POST 方法访问。 -- body:获取访问令牌的请求主体。通常情况下,可在这里提供授权码。 -- expire:令牌的过期时间,时间单位是秒,允许使用模板,所以必须是一个字符串。 - -### refresh - -- url:刷新令牌的网址,总是使用 POST 方式请求。 -- headers:用于刷新令牌的请求头。通常把令牌放在这里,用于授权。 -- body:刷新令牌的请求主体。当使用头文件来传递刷新令牌时,可能不需要配置此选项。 - - - diff --git a/zh_CN/streaming-processing/http_push.md b/zh_CN/streaming-processing/http_push.md index 7b003a9..bcf1d39 100644 --- a/zh_CN/streaming-processing/http_push.md +++ b/zh_CN/streaming-processing/http_push.md @@ -1,36 +1,21 @@ # HTTP Push 源 扫描表 - -NeuronEX 默认支持 HTTP Push 源,它作为一个 HTTP 服务器,可以接收来自 HTTP 客户端的消息。所有的 HTTP 推送源共用单一的全局 HTTP 数据服务器。每个源可以有自己的 URL,这样就可以支持多个端点。该类型可以作为流、扫描表的数据源。 -配置分成两个部分:全局服务器配置和源配置。 +NeuronEX 数据处理模块通过 `HTTP Push` 类型的数据源,可以在内部启动一个 HTTP 服务器,默认地址为`http://0.0.0.0:10081`,接收来自 HTTP 客户端的消息,在所有规则中都可以共用这个 `HTTP Push`数据源。该类型可以作为流、扫描表的数据源。 -## 服务器配置 +:::tip 提示 +当有任何使用 `HTTP Push` 源的规则启动后, HTTP 服务器才会启动运行,10081端口开启。当所有使用 httppush 源的规则都关闭后, HTTP 服务器会关闭运行,10081端口关闭。 +::: -服务器配置在 `etc/kuiper.yaml` 中的 `source` 部分。 +如果使用 Docker 部署 NeuronEX,需要在 `docker run` 命令中添加 `-p 10081:10081` 参数,将容器内的 10081 端口映射到宿主机的 10081 端口后,才可正常使用 `HTTP Push`源 。 -```yaml -source: - ## Configurations for the global http data server for httppush source - # HTTP data service ip - httpServerIp: 0.0.0.0 - # HTTP data service port - httpServerPort: 10081 - # httpServerTls: - # certfile: /var/https-server.crt - # keyfile: /var/https-server.key +```bash +## run NeuronEX +$ docker run -d --name neuronex -p 8085:8085 -p 10081:10081 --log-opt max-size=100m emqx/neuronex:latest ``` -用户可以指定以下属性: - -- httpServerIp:用于绑定 http 数据服务器的IP。 -- httpServerPort:用于绑定 http 数据服务器的端口。 -- httpServerTls: http 服务器 TLS 的配置。 - -一旦有任何需要 httppush 源的规则启动,全局服务器就会启动。一旦所有引用的规则都关闭,它就会关闭。 - ## 创建流(源配置) 登录 NeuronEX,点击**数据流处理** -> **源管理**。在**流管理**页签,点击**创建流**。 @@ -38,37 +23,51 @@ source: 在弹出的**源管理** / **创建**页面,进入如下配置: - **流名称**:输入流名称 -- **是否为带结构的流**:勾选确认是否为带结构的流,如为带结构的流,则需进一步添加流字段 - - **名称**:字段名称 - - **类型**:支持 bigint、float、string、datetime、boolean、array、struct、bytea +- **是否为带结构的流**:勾选确认是否为带结构的流,如为带结构的流,则需进一步添加流字段。可默认不勾选。 - **流类型**:选择 httppush -- **数据源**:指定 URL 的路径部分,与 URL 属性拼接成最终 URL, 例如 /api/data。 -- **配置组**:可使用默认配置组,如希望自定义配置组,可点击添加配置组按钮,在弹出的对话框中进行如下设置,设置完成后,可点击**测试连接**进行测试: +- **数据源**:指定 URL 的路径部分,例如 /api/data。 + :::tip 提示 + httppush类型的数据源启动的 HTTP 服务器地址为`http://0.0.0.0:10081`,**数据源(URL拼接路径)** 填写为`/api/data`,则 HTTP 完整请求地址为:`http://0.0.0.0:10081/api/data` 。 + ::: +- **配置组**:可使用默认配置组,如希望自定义配置组,可点击添加配置组: - **名称**:输入配置组名称。 - **请求方法**:HTTP 请求方法,可以是 POST 或 PUT。 -- **流格式**:支持 json、binary、protobuf、delimited、custom。 - - 如选择 protobuf 或 custom,还应配置对应的[模式和模式消息](./config.md#模式) - - 如选择 delimited,还应配置分隔符,如 "," - -- **时间戳字段**:指定代表时间的字段。 -- **时间戳格式**:指定时间戳格式。 +- **流格式**:支持 json、binary、protobuf、delimited、custom。默认 json 格式。 - **共享**:勾选确认是否共享源。 + + ## 创建扫描表(源配置) HTTP Push 源支持查询表。登录 NeuronEX,点击**数据流处理** -> **源管理**。在**扫描表**页签,点击**创建扫描表**。 - **表名称**:输入表名称 -- **是否为带结构的表**:勾选确认是否为带结构的表,如为带结构的表,则需进一步添加表字段 - - **名称**:字段名称 - - **类型**:支持 bigint、float、string、datetime、boolean、array、struct、bytea +- **是否为带结构的表**:勾选确认是否为带结构的表,如为带结构的表,则需进一步添加表字段。可默认不勾选。 - **表类型**:选择 httppush。 - **数据源**:指定 URL 的路径部分,与 URL 属性拼接成最终 URL, 例如 /api/data。 -- **配置组**:可使用默认配置组,如希望自定义配置组,可参考[创建流](#创建流)部分 -- **表格式**:支持 json、binary、delimited、custom。 - - 如选择 custom,还应配置对应的[模式和模式消息](./config.md#模式) - - 如选择 delimited,还应配置分隔符,如 "," - +- **配置组**:可使用默认配置组,如希望自定义配置组,可参考[创建流](#创建流)部分。 +- **表格式**:支持 json、binary、protobuf、delimited、custom。默认 json 格式。 - **保留大小**:指定保留大小。 -您也可选择通过文本模式进行配置,通过 SQL 定义。 + + \ No newline at end of file diff --git a/zh_CN/streaming-processing/lookup.md b/zh_CN/streaming-processing/lookup.md index 273021e..78cf9bc 100644 --- a/zh_CN/streaming-processing/lookup.md +++ b/zh_CN/streaming-processing/lookup.md @@ -99,11 +99,8 @@ 流数据变化频繁,数据量大,通常只包含需要经常变化的数据;而不变或者变化较少的数据通常存储于数据库等外部存储中。在应用处理时,通常需要将流数据中缺少的静态数据补全。例如,流数据中包含了设备的 ID,但设备的具体名称,型号的描述数据存储于数据库中。本场景中,我们将介绍如何将流数据与批数据结合,进行自动数据补全。 -### SQL 插件安装和配置 +### SQL 插件配置 -本场景将使用 MySQL 作为外部表数据存储位置。NeuronEX 提供了预编译的 SQL source 插件,可访问 MySQL 数据并将其作为查询表。因此,在开始教程之前,我们需要先安装 SQL source 插件。在 NeuronEX Web 界面,可直接在插件管理中,点击创建插件,如下图选择 SQL source 插件进行安装。 - -Install SQL source 本场景将以 MySQL 为例,介绍如何与关系数据库进行连接。用户需要启动 MySQL 实例。在 MySQL 中创建表 `devices`, 其中包含 `id`, `name`, `deviceKind` 等字段并提前写入内容。 diff --git a/zh_CN/streaming-processing/memory.md b/zh_CN/streaming-processing/memory.md index 1218b67..9c97776 100644 --- a/zh_CN/streaming-processing/memory.md +++ b/zh_CN/streaming-processing/memory.md @@ -2,7 +2,9 @@ 扫描表  查询表 -内存源通过主题消费由内存目标生成的事件。该主题类似于 pubsub 主题,例如 mqtt,因此可能有多个内存目标发布到同一主题,也可能有多个内存源订阅同一主题。 内存动作的典型用途是形成规则管道。内存动作和内存源之间的数据传输采用内部格式,不经过编解码以提高效率。因此,内存源的`format`属性会被忽略。 +内存源获取 NeuronEX 流处理模块内存中保存的数据流事件。内存动作的典型用途是形成 [**规则流水线**](./rule_pipeline.md),将上一个规则处理的结果放在 [**内存 Sink**](./sink/memory.md) 中,然后再通过内存源获取数据,形成规则流水线。 + +内存源通过`数据源(主题)`消费由 内存 Sink 生成的事件,主题消费该主题类似于 pubsub 主题,例如 MQTT,因此可能有多个内存目标发布到同一主题,也可能有多个内存源订阅同一主题。详情查阅[主题通配符](#主题通配符) ## 创建流 @@ -11,25 +13,14 @@ 在弹出的**源管理** / **创建**页面,进入如下配置: - **流名称**:输入流名称 -- **是否为带结构的流**:勾选确认是否为带结构的流,如为带结构的流,则需进一步添加流字段 - - **名称**:字段名称 - - **类型**:支持 bigint、float、string、datetime、boolean、array、struct、bytea +- **是否为带结构的流**:勾选确认是否为带结构的流,如为带结构的流,则需进一步添加流字段。可默认不勾选。 - **流类型**:选择 memory -- **数据源**(主题):将要订阅的内存主题, 例如 topic1。类似 MQTT 主题 -- **配置组**:可使用默认配置组,主题没有配置属性,由流数据源属性指定,但您可点击添加配置组按钮,输入名称名称,定义一个新的配置组。 -- **流格式**:支持 json、binary、protobuf、delimited、custom。 - - 如选择 protobuf 或 custom,还应配置对应的[模式和模式消息](./config.md#模式) - - 如选择 delimited,还应配置分隔符,如 "," - -- **时间戳字段**:指定代表时间的字段。 -- **时间戳格式**:指定时间戳格式。 -- **共享**:勾选确认是否共享源。 +- **数据源(主题)**:将要订阅的内存主题, 例如 topic1。详情查阅[主题通配符](#主题通配符) -您也可选择通过文本模式进行配置,通过 SQL 定义。 ## 主题通配符 -内存源也支持主题通配符,与 mqtt 主题类似。 目前,支持两种通配符。 +内存源也支持主题通配符,与 MQTT 主题类似。 目前,支持两种通配符。 **+** : 单级通配符替换一个主题等级。 @@ -45,42 +36,30 @@ 内存源支持查询表。登录 NeuronEX,点击**数据流处理** -> **源管理**。在**扫描表**页签,点击**创建扫描表**。 - **表名称**:输入表名称 -- **是否为带结构的表**:勾选确认是否为带结构的表,如为带结构的表,则需进一步添加表字段 - - **名称**:字段名称 - - **类型**:支持 bigint、float、string、datetime、boolean、array、struct、bytea +- **是否为带结构的流**:勾选确认是否为带结构的流,如为带结构的流,则需进一步添加流字段。可默认不勾选。 - **表类型**:选择 memory -- **数据源**(主题):将要订阅的内存主题, 例如 topic1。类似 MQTT 主题。 -- **配置组**:可使用默认配置组,主题没有配置属性,由流数据源属性指定,但您可点击添加配置组按钮,输入名称名称,定义一个新的配置组。 -- **表格式**:支持 json、binary、delimited、custom。 - - 如选择 custom,还应配置对应的[模式和模式消息](./config.md#模式) - - 如选择 delimited,还应配置分隔符,如 "," - +- **数据源**(主题):将要订阅的内存主题, 例如 topic1。类似 MQTT 主题。详情查阅[主题通配符](#主题通配符) - **保留大小**:指定保留大小。 -您也可选择通过文本模式进行配置,通过 SQL 定义。 - ## 创建查询表 -内存源支持查询表。在创建一个内存查询表后,它将开始积累由键字段索引的内存主题的数据。它将一直独立于规则运行。每个主题和键对将有一个虚拟表的内存拷贝。所有引用同一表或具有相同主题/键对的内存表的规则将共享同一数据副本。它可以在内存中存储任何流类型的历史,以便其他流可以与之合作。 +内存源支持用作查询表,此时,主要具备如下优势: + +- **独立性**:内存查找表独立于任何规则操作。即使规则被修改或删除,内存查找表中的数据也不受影响。 +数据共享:如果多个规则引用相同的表,或者存在具有相同主题/键对的多个内存表,则它们全部共享相同的数据集,确保了不同规则之间的一致性,简化了数据访问。 +- **与内存 Sink 集成**:内存查找表可通过与可更新的 [内存 Sink](./sink/memory.md) 集成,保证内容的实时性。 +- **规则流水线**:内存查找表可以作为多个规则之间的桥梁,类似于规则流水线的概念。它使一个流能够将历史数据存储在内存中,其他流可以访问和利用这些数据,因此适用于需要结合历史数据和实时数据进行决策的场景。 登录 NeuronEX,点击**数据流处理** -> **源管理**。在**查询表**页签,点击**创建查询表**。 - **表名称**:输入表名称 -- **是否为带结构的表**:勾选确认是否为带结构的表,如为带结构的表,则需进一步添加表字段 - - **名称**:字段名称 - - **类型**:支持 bigint、float、string、datetime、boolean、array、struct、bytea +- **是否为带结构的表**:勾选确认是否为带结构的表,如为带结构的表,则需进一步添加表字段。可默认不勾选。 - **表类型**:选择 memory -- **数据源**(主题):将要订阅的内存主题, 例如 topic1。类似 MQTT 主题。 -- **配置组**:可使用默认配置组,主题没有配置属性,由流数据源属性指定,但您可点击添加配置组按钮,输入名称名称,定义一个新的配置组。 -- **表格式**:支持 json、binary、delimited、custom。 - - 如选择 custom,还应配置对应的[模式和模式消息](./config.md#模式) - - 如选择 delimited,还应配置分隔符,如 "," - +- **数据源**(主题):将要订阅的内存主题, 例如 topic1。类似 MQTT 主题。详情查阅[主题通配符](#主题通配符) - **主键**:指定主键。 -您也可选择通过文本模式进行配置,通过 SQL 定义,例如: +:::tip 提示 +作为查询表使用时,还应配置 KEY 属性,它将作为虚拟表的主键来加速查询。创建完成后,内存查找表将开始从指定的内存主题累积数据,并通过 KEY 字段进行索引,允许快速检索。 -```sql -CREATE TABLE alertTable() WITH (DATASOURCE="topicName", TYPE="memory", KIND="lookup", KEY="id") -``` +::: \ No newline at end of file diff --git a/zh_CN/streaming-processing/mqtt.md b/zh_CN/streaming-processing/mqtt.md index af905c2..723b74f 100644 --- a/zh_CN/streaming-processing/mqtt.md +++ b/zh_CN/streaming-processing/mqtt.md @@ -2,9 +2,7 @@ 扫描表 - - -NeuronEX 为 MQTT 源流提供了内置支持,流可以订阅来自 MQTT 代理的消息并输入 NeuronEX 处理。 +NeuronEX 数据处理模块通过 `MQTT` 类型的数据源,可以接收来自 MQTT Broker 的数据并通过规则处理分析。 ## 创建流 @@ -13,35 +11,27 @@ NeuronEX 为 MQTT 源流提供了内置支持,流可以订阅来自 MQTT 代 在弹出的**源管理** / **创建**页面,进入如下配置: - **流名称**:输入流名称 -- **是否为带结构的流**:勾选确认是否为带结构的流,如为带结构的流,则需进一步添加流字段 - - - **名称**:字段名称 - - **类型**:支持 bigint、float、string、datetime、boolean、array、struct、bytea +- **是否为带结构的流**:勾选确认是否为带结构的流,如为带结构的流,则需进一步添加流字段。可默认不勾选。 - **流类型**:选择 mqtt - **数据源**(MQTT 主题):将要订阅的 MQTT 主题, 例如 topic1。 -- **配置组**:可使用默认配置组,如希望自定义配置组,可点击添加配置组按钮,在弹出的对话框中进行如下设置,设置完成后,可点击**测试连接**进行测试: +- **配置组**:可编辑使用默认配置组,或点击添加配置组,在弹出的对话框中进行如下设置,设置完成后,可点击**测试连接**进行测试: - **名称**:输入配置组名称。 - - **服务器地址**:MQTT 消息代理的服务器。 - - **用户名**:MQTT 连接用户名。 - - **密码**:MQTT 连接密码。 - - **MQTT 协议版本**:MQTT 协议版本,支持 3.1 或者 3.1.1 ,默认为 3.1。 - + - **服务器地址**:MQTT 消息代理的服务器,比如 `tcp://127.0.0.1:1883` 。 + - **用户名**:可选参数,MQTT 连接用户名。 + - **密码**:可选参数,MQTT 连接密码。 + - **MQTT 协议版本**:MQTT 协议版本,支持 3.1 或者 3.1.1 ,默认为 3.1.1。 - **客户端 ID**:MQTT 连接的客户端 ID。 如果未指定,将使用一个 uuid。 - - **QoS 级别**:默认订阅 QoS 级别,可选值:0、1、2 - - **证书类型**:证书路径。可以为绝对路径,也可以为相对路径。如果指定的是相对路径,那么父目录为执行 server 命令的路径。 - - **私钥路径**:私钥路径。可以为绝对路径,也可以为相对路径。 - - **根证书路径**:根证书路径,用以验证服务器证书。可以为绝对路径,也可以为相对路径。 - - **跳过证书验证**:控制是否跳过证书认证。如设置为 True,将跳过证书认证;否则进行证书验证。 - - **Kubeedge 版本号**:Kubeedge 版本号,不同的版本号对应的文件内容不同。 - - **KubeEdge 模版文件**:KubeEdge 模版文件名,文件指定放在 `etc/sources` 文件夹中。 - - **解压缩**:使用指定的压缩方法解压缩 MQTT Payload,可选值:zlib、gzip、flate。 -- **流格式**:支持 json、binary、protobuf、delimited、custom。 + - **QoS 级别**:默认 QoS 级别为0,可选值:0、1、2。 + - **证书类型**:可选参数,填写证书路径,可以为绝对路径,也可以为相对路径。如果指定的是相对路径,那么父目录为执行 neuronex 命令的路径。示例值:`/var/xyz-certificate.pem` + - **私钥路径**:可选参数,可以为绝对路径,也可以为相对路径。示例值:`/var/xyz-private.pem.key` + - **根证书路径**:可选参数,用以验证服务器证书。可以为绝对路径,也可以为相对路径。示例值:`/var/xyz-rootca.pem` + - **跳过证书验证**:默认为 False。如设置为 True,将跳过证书认证,否则进行证书验证。 + - **解压缩**:默认留空为不解压缩。使用指定的压缩方法解压缩 MQTT Payload,可选值:zlib、gzip、flate。 +- **流格式**:支持 json、binary、protobuf、delimited、custom。默认 json 格式。 - 如选择 protobuf 或 custom,还应配置对应的[模式和模式消息](./config.md#模式) - - 如选择 delimited,还应配置分隔符,如 "," + - 如选择 delimited,还应配置分隔符,如 "`,`" -- **时间戳字段**:指定代表时间的字段。 -- **时间戳格式**:指定时间戳格式。 - **共享**:勾选确认是否共享源。 ## 创建扫描表 @@ -49,9 +39,7 @@ NeuronEX 为 MQTT 源流提供了内置支持,流可以订阅来自 MQTT 代 MQTT 源支持查询表。登录 NeuronEX,点击**数据流处理** -> **源管理**。在**扫描表**页签,点击**创建扫描表**。 - **表名称**:输入表名称 -- **是否为带结构的表**:勾选确认是否为带结构的表,如为带结构的表,则需进一步添加表字段 - - **名称**:字段名称 - - **类型**:支持 bigint、float、string、datetime、boolean、array、struct、bytea +- **是否为带结构的表**:勾选确认是否为带结构的流,如为带结构的流,则需进一步添加流字段。可默认不勾选。 - **表类型**:选择 mqtt。 - **数据源**(MQTT 主题):将要订阅的 MQTT 主题, 例如 topic1 - **配置组**:可使用默认配置组,如希望自定义配置组,可参考[创建流](#创建流)部分 @@ -60,5 +48,3 @@ MQTT 源支持查询表。登录 NeuronEX,点击**数据流处理** -> **源 - 如选择 delimited,还应配置分隔符,如 "," - **保留大小**:指定保留大小。 - -您也可选择通过文本模式进行配置,通过 SQL 定义。 \ No newline at end of file diff --git a/zh_CN/streaming-processing/neuron.md b/zh_CN/streaming-processing/neuron.md index e4e6da5..b2461c3 100644 --- a/zh_CN/streaming-processing/neuron.md +++ b/zh_CN/streaming-processing/neuron.md @@ -2,17 +2,25 @@ 扫描表 - +NeuronEX 数据处理模块通过 `Neuron` 类型的数据源,可以接收来自 NeuronEX 数采模块的数据并通过规则处理分析。 -NeuronEX 为 Neuron 源流提供了内置支持,流可以订阅来自本地 Neuron 的消息并输入 NeuronEX 处理。 -:::tip +在 NeuronEX **数据流处理** -> **源管理**,在**流管理**页签下,NeuronEX 已经默认配置的 `Neuron` 类型的数据源 neuronStream。 -该源仅可用于本地的 Neuron,因为与 Neuron 的通信基于 nanomsg ipc 协议,无法通过网络进行。在 NeuronEX 端,所有 Neuron 源和动作共享同一个 Neuron 连接。 +neuronstream -由于拨号到 Neuron 是异步的,因此即使 Neuron 停机,Neuron sink 的规则也不会看到报错。用户可通过消息流入数量判断连接是否正常。 -::: + +将 NeuronEX 数采模块的数据发送到 neuronStream,需要在 NeuronEX **数据采集** -> **北向应用**页签下,将驱动数据添加订阅到 DataProcessing节点,如下图所示: + +dataprocessing + + +然后,用户可直接在 **数据流处理** -> **规则** -> **新建规则** 里,选择 `neuronStream` 作为数据源,进行规则的创建。 +```sql + +SELECT * FROM neuronStream +``` ## 消息格式 @@ -27,9 +35,8 @@ Neuron 发过来的消息为固定的 json 格式,如下所示: "tag_name1": 11.22, "tag_name2": "string" }, - "errors": { - "tag_name3": 122 - } + "errors": {}, + "metas":{} } ``` @@ -37,41 +44,34 @@ Neuron 发过来的消息为固定的 json 格式,如下所示: ## 创建流 -登录 NeuronEX,点击**数据流处理** -> **源管理**。在**流管理**页签,点击**创建流**。 +创建 `Neuron` 类型的流。登录 NeuronEX,点击**数据流处理** -> **源管理**。在**流管理**页签,点击**创建流**。 在弹出的**源管理** / **创建**页面,进入如下配置: - **流名称**:输入流名称 -- **是否为带结构的流**:勾选确认是否为带结构的流,如为带结构的流,则需进一步添加流字段 - - - **名称**:字段名称 - - **类型**:支持 bigint、float、string、datetime、boolean、array、struct、bytea +- **是否为带结构的流**:默认不勾选即可 - **流类型**:选择 neuron。 - **配置组**:可使用默认配置组,如希望自定义配置组,可点击添加配置组按钮,在弹出的对话框中进行如下设置,设置完成后,可点击**测试连接**进行测试: - **名称**:输入配置组名称。 - - **路径**:连接 Neuron 的 nng url -- **流格式**:支持 json、binary、protobuf、delimited、custom。 - - 如选择 protobuf 或 custom,还应配置对应的[模式和模式消息](./config.md#模式) - - 如选择 delimited,还应配置分隔符,如 "," -- **时间戳字段**:指定代表时间的字段。 -- **时间戳格式**:指定时间戳格式。 -- **共享**:勾选确认是否共享源。 + - **路径**:连接 NeuronEX 数采模块服务的 url,默认为tcp://127.0.0.1:7081,需要与 NeuronEX 数采模块的url匹配。 +- **流格式**:选择 `json` 格式。 +- **共享**:勾选是否共享源,勾不勾选皆可。 + +:::tip 提示 +默认已经配置了一个名为 neuronStream 的 `Neuron` 类型数据源,用户可直接使用。当neuronStream被删除后,可通过上述步骤进行创建。 +::: ## 创建扫描表 -MQTT 源支持查询表。登录 NeuronEX,点击**数据流处理** -> **源管理**。在**扫描表**页签,点击**创建扫描表**。 +创建 `Neuron` 类型的扫描表。登录 NeuronEX,点击**数据流处理** -> **源管理**。在**扫描表**页签,点击**创建扫描表**。 - **表名称**:输入表名称 -- **是否为带结构的表**:勾选确认是否为带结构的表,如为带结构的表,则需进一步添加表字段 - - **名称**:字段名称 - - **类型**:支持 bigint、float、string、datetime、boolean、array、struct、bytea +- **是否为带结构的表**:默认不勾选即可 - **表类型**:选择 neuron。 -- **配置组**:可使用默认配置组,如希望自定义配置组,可参考[创建流](#创建流)部分 -- **表格式**:支持 json、binary、delimited、custom。 - - 如选择 custom,还应配置对应的[模式和模式消息](./config.md#模式) - - 如选择 delimited,还应配置分隔符,如 "," - -- **保留大小**:指定保留大小。 +- **配置组**:可使用默认配置组,如希望自定义配置组,可点击添加配置组按钮,在弹出的对话框中进行如下设置,设置完成后,可点击**测试连接**进行测试: -您也可选择通过文本模式进行配置,通过 SQL 定义。 \ No newline at end of file + - **名称**:输入配置组名称。 + - **路径**:连接 NeuronEX 数采模块服务的 url,默认为tcp://127.0.0.1:7081,需要与 NeuronEX 数采模块的url匹配。 +- **表格式**:选择 `json` 格式。 +- **保留大小**:指定保留大小,默认为1。 diff --git a/zh_CN/streaming-processing/overview.md b/zh_CN/streaming-processing/overview.md index 75b4379..1bd9af7 100644 --- a/zh_CN/streaming-processing/overview.md +++ b/zh_CN/streaming-processing/overview.md @@ -23,9 +23,9 @@ NeuronEX 数据流处理模块的架构如下: ### AI/ML 集成 可通过 Python 或者 Go 扩展接入自定义函数、 AI/ML 算法等,实现在源(Source),函数(Function), 动作(Sink) 三个方面的扩展. -- 源 (Source) :允许用户接入更多的数据源用于数据分析 -- 目标 (Sink):允许用户将分析结果发送到不同的扩展系统中 -- 函数(Function):允许用户增加自定义函数用于数据分析(比如,AI/ML 的函数调用) +- [源 (Source)](./source.md) :允许用户接入更多的数据源用于数据分析 +- [目标 (Sink)](./sink/sink.md):允许用户将分析结果发送到不同的扩展系统中 +- [函数(Function)](./extension.md):允许用户增加自定义函数用于数据分析(比如,AI/ML 的函数调用) ### 多源数据接入集成 具备各类数据灵活获取的能力,可以支持: @@ -45,63 +45,65 @@ NeuronEX 数据流处理模块的架构如下: ### 源(Source) -[源(Source)](./source.md)定义了与外部系统的连接方式,以便将数据加载进来。在规则中,根据数据使用逻辑,数据源可作为流或者表使用。 +[源(Source)](./source.md)定义了与外部系统的连接方式,以便将数据加载进来。在规则中,根据数据使用逻辑,数据源可作为 **流(Stream)** 或者 **表(Table)** 使用。 -- [流(Stream)](./stream.md):流是 NeuronEX 中数据源连接器的运行形式,用户可通过指定源类型来定义如何连接到外部资源。流的作用就像规则的触发器,每个事件都会触发规则中的计算。 +- [流(Stream)](./stream.md):流是 NeuronEX 中数据源接入的主要运行方式,用户可通过选择数据源类型及配置参数来定义如何连接到外部资源。数据流中有数据流入时,都会触发规则中的计算。 -- [表(Table)](./tables.md):表 (**Table**) 用于表示流的当前状态。它可以被认为是流的快照,您可通过表对数据进行批处理。 +- [表(Table)](./tables.md):表 (**Table**) 用于表示流的当前状态。它可以被认为是流的快照,您可通过表对数据进行批处理。通过在规则中组合使用**流**与**表**,可以实现更多的数据处理功能。 - - [扫描表](./scan.md):像一个由事件驱动的流一样,逐个加载数据事件。该模式的源可以用在流或扫描表中。 + - [扫描表](./scan.md):在 NeuronEX 在内存中保存状态数据,像一个由事件驱动的流一样,逐个加载数据事件。 - - [查询表](./lookup.md):在需要时引用外部内容,只用于查找表。 + - [查询表](./lookup.md):绑定外部数据(比如 SQL 数据库中的数据)并在需要时查询引用外部数据。 - **源的解码**:用户可以在创建源时通过指定 `format` 属性来定义解码方式。当前支持 `json`、`binary`、`protobuf` 和 `delimited` 格式,你也可以使用自己的编码格式,并将该字段定义为 `custom`。 -- **源的定义与运行**:定义数据源的流或者表之后,系统实际上只是创建了一个数据源的逻辑定义而非真正物理运行的数据输入。此逻辑定义可在多个规则的 SQL 的 `from` 子句中使用。只有当使用了该定义的规则启动之后,数据流才会真正运行。 +- **源的定义与运行**:创建数据源的流或者表之后,系统实际上只是创建了一个数据源的逻辑定义而非真正物理运行的数据输入。此逻辑定义可在多个规则的 SQL 的 `from` 子句中使用。只有当使用了该定义的规则启动之后,数据流才会真正运行。 目前 NeuronEX 内置以下数据源: -- [文件](./file.md):从文件中读取数据,通常用作表格,可以作为流、扫描表的数据源; -- [内存](./memory.md):从内存主题读取数据以形成规则管道,可以作为流、扫描表和查询表的数据源; -- [HTTP pull](./http_pull.md):从 HTTP 服务器中拉取数据,可以作为流、扫描表的数据源; -- [HTTP push](./http_push.md):通过 http 推送数据到数据处理模块,可以作为流、扫描表的数据源; +- [Neuron](./neuron.md): 从 NeuronEX 数采模块读取数据,可以作为流、扫描表的数据源; - [MQTT](./mqtt.md):从 MQTT 主题读取数据,可以作为流、扫描表的数据源; -- [Neuron](./neuron.md): 从本地 Neuron 实例读取数据,可以作为流、扫描表的数据源; -- [Redis](./redis.md):从 Redis 中查询数据,可以作为查询表的数据源。 +- [HTTP pull](./http_pull.md):从 HTTP 服务器中拉取数据,可以作为流、扫描表的数据源; +- [HTTP push](./http_push.md):通过 NeuronEX 内置的HTTP Server源接收 HTTP 客户端消息,可以作为流、扫描表的数据源; +- [内存](./memory.md):从内存主题读取数据以形成规则流水线,可以作为流、扫描表和查询表的数据源; +- [SQL](./sql.md):从 `sqlserver\postgres\mysql\sqlite3\oracle`` 数据库中获取数据; +- [文件](./file.md):从文件中读取数据,通常用作表格,可以作为流、扫描表的数据源; +- [Video](./video.md):从视频流中获取数据,可以作为流、扫描表的数据源。 ### [规则(Rule)](./rules.md) -规则代表了一个数据流处理流程,定义了从数据源输入、到各种处理逻辑,再到将数据输出到Sink动作。 +规则代表了一个数据流处理流程,定义了从数据源输入、到各种处理逻辑,再到将数据输出到动作(Sink)。 - **规则生命周期**:规则一旦启动就会连续运行,只有在用户明确发送停止命令时才会停止。规则可能会因为错误或 NeuronEX 实例退出而异常停止。 - **多个规则的关系**:规则在运行时上是分开的,一个规则的错误不影响其他规则。所有的规则都共享相同的硬件资源,每条规则可以指定算子缓冲区,以限制处理速度,避免占用所有资源。 -- **规则流水线**:多个规则可以通过指定 `内存 Source/Sink` 形成一个处理管道。例如,第一条规则在内存 sink 中保存结果,其他规则在其内存源中订阅主题获取数据。除了通过`内存 Source/Sink`,用户还可以使用 `MQTT Source/Sink` 来连接规则。 +- [**规则流水线**](./rule_pipeline.md):多个规则可以通过指定 `内存 Source/Sink` 形成一个处理管道。例如,第一条规则在内存 sink 中保存结果,其他规则在其内存源中订阅主题获取数据。除了通过`内存 Source/Sink`,用户还可以使用 `MQTT Source/Sink` 来连接规则。 -- **[SQL语句](./sqls/overview.md)**:规则中的 SQL 语言支持包括数据定义语言(DDL)、数据操作语言(DML)和查询语言。NeuronEX 中的 SQL 支持是 ANSI SQL 的一个子集,并有一些定制的扩展。 +- **[SQL语句](./sqls/overview.md)**:NeuronEX 数据处理模块提供了一种类似于 SQL 的查询语言,用于对数据流执行转换和计算。规则中的 SQL 语言支持包括数据定义语言(DDL)、数据操作语言(DML)和查询语言。NeuronEX 中的 SQL 支持是 ANSI SQL 的一个子集,并有一些定制的扩展。 ### 动作(Sink) - [动作Sink](./sink/sink.md):动作Sink 用来向外部系统写入数据,一个规则可以有多个动作,不同的动作可以是同一个动作类型。 -- 数据模板:动作的结果是一个字符串。默认情况下,它将被编码为 json 字符串。用户可以通过设置 dataTemplate 来改变格式,它利用 Go 模板语法将结果格式化为字符串。 +- [数据模板](./sink/data_template.md): NeuronEX 通过**规则**进行数据分析处理后,数据模板将规则处理结果进行「二次处理」后,使用各种 Sink 可以往不同的系统。数据模板为非必须配置项,如果不配置数据模板,规则处理结果将直接输出到 Sink 中。 + - 动作类型 - [MQTT sink](./sink/mqtt.md):输出到外部 MQTT 服务。 - - [Neuron sink](./sink/neuron.md):输出到 NeuronEX 数采引擎。 + - [Neuron sink](./sink/neuron.md):输出到 NeuronEX 数采模块。 - [Rest sink](./sink/rest.md):输出到外部 HTTP 服务器。 - - [Redis sink](./sink/redis.md): 写入 Redis 。 - - [File sink](./sink/file.md): 写入文件。 - - [Memory sink](./sink/memory.md):输出到内存(Memory)主题以形成规则管道。 + - [Memory sink](./sink/memory.md):输出到内存(Memory)主题以形成规则流水线。 - [Log sink](./sink/log.md):写入日志,通常只用于调试。 - - [Nop sink](./sink/nop.md):不输出,用于性能测试。 - [SQL sink](./sink/sql.md):写入 SQL 数据库。 - [InfluxDB V1 sink](./sink/influx.md): 写入 Influx DB `v1.x`。 - [InfluxDB V2 sink](./sink/influx2.md): 写入 Influx DB `v2.x`。 + - [File sink](./sink/file.md): 写入文件。 + - [Nop sink](./sink/nop.md):不输出,用于性能测试。 + - - [Kafka sink](./sink/kafka.md):输出到 Kafka 。 + ### 流式处理 @@ -109,20 +111,20 @@ NeuronEX 数据流处理模块的架构如下: 流处理具有低延迟、近实时的特点,可以在数据产生后就进行处理,以极低的延迟获得结果,因此流处理可以用于实时分析、实时计算、实时预测等场景。 - **有状态的流处理**:有状态的流处理是流处理的一个子集,其中的计算保持着上下文状态。有状态流处理的例子包括: - - 聚合事件以计算总和、计数或平均值时。 + - [聚合事件](./sqls/functions/aggregate_functions.md)以计算总和、计数或平均值时。 - 检测事件的变化。 - 在一系列事件中搜索一个模式。 -- **窗口**:窗口提供了一种机制,将无界的数据分割成一系列连续的有界数据来计算。在时间窗口中,同时支持处理时间和事件时间。对于所有支持的窗口类型,请查看窗口函数。在 NeuronEX 中,内置的窗口包括两种类型: +- [**窗口**](./sqls/windows.md):窗口提供了一种机制,将无界的数据分割成一系列连续的有界数据来计算。在时间窗口中,同时支持处理时间和事件时间。对于所有支持的窗口类型,请查看窗口函数。在 NeuronEX 中,内置的窗口包括两种类型: - 时间窗口:按时间分割的窗口 - 计数窗口:按元素计数分割的窗口 -- **多源连接**:在流处理中,连接是将多个数据源合并到一起的唯一方法。它需要一种方法来对齐多个来源并触发连接结果。NeuronEX 支持的连接类型包括 LEFT、RIGHT、FULL 和 CROSS 。 +- [**多源连接**](./sqls/query_language_elements.md#join):在流处理中,连接是将多个数据源合并到一起的唯一方法。它需要一种方法来对齐多个来源并触发连接结果。NeuronEX 支持的连接类型包括 LEFT、RIGHT、FULL 和 CROSS 。 - **流处理的时间概念**: 流数据是一个随时间变化的数据序列,其中时间是数据的一个固有属性。在流处理中,时间在计算中起着重要的作用。例如,在做基于某些时间段(通常称为窗口)的聚合时,定义时间的概念是很重要的。在流处理中,有两种时间概念: - - **事件时间**,即事件实际发生的时间。通常情况下,事件应该有一个时间戳字段来表明其产生的时间。 + - [**事件时间**](./rules.md#规则选项-可选),即事件实际发生的时间。通常情况下,事件应该有一个时间戳字段来表明其产生的时间。 - **处理时间**,即在系统中观察到事件的时间。 - **事件时间和水印**:一个支持事件时间的流处理器需要一种方法来衡量事件时间的进展。例如,创建一个小时的时间窗口时,内部的算子需要在事件时间超过一小时后得到通知,这样算子就可以发布正在进行的窗口。 @@ -132,9 +134,9 @@ NeuronEX 中衡量事件时间进展的机制是水印。水印作为数据流 ### [扩展](./extension.md) -NeuronEX 允许用户自定义扩展,以支持AI/ML等更多功能。 用户可以通过插件系统编写 Python/Go 扩展插件。此外,用户也可以调用外部已有的 REST 或 RPC 服务。您可通过[扩展](./extension.md)页面创建插件或注册外部服务。 +NeuronEX 允许用户自定义扩展,以支持AI/ML等更多功能。 用户可以通过插件系统编写 Python/Go 扩展插件。此外,用户也可以调用外部已有的 REST 服务作为函数及算法扩展来使用。您可通过[扩展](./extension.md)页面创建插件或注册外部服务。 ### [配置](./config.md) -配置页面介绍如何进行资源配置,如配置组和传输与存储模版、连接、模式以及文件管理。 +配置页面介绍 NeuronEX 数据分析模块的[资源](./config.md#资源)配置及[模式](./config.md#模式)配置等。 diff --git a/zh_CN/streaming-processing/portable_python.md b/zh_CN/streaming-processing/portable_python.md index e23c0db..a8060a5 100644 --- a/zh_CN/streaming-processing/portable_python.md +++ b/zh_CN/streaming-processing/portable_python.md @@ -4,7 +4,14 @@ ## 部署要求 使用Python 便携插件,需要有 Python3 环境。 -- 如果您通过安装包的方式安装 NeuronEX ,则需要手动安装 python 3.x 环境。 +- 如果您通过安装包的方式安装 NeuronEX ,则需要 + + 1. 手动安装 python 3.x 环境。 + 2. 通过 pip 安装`ekuiper`和`pynng`库 + ```shell + pip install ekuiper pynng + ``` + - 如果您通过 docker 的方式安装 NeuronEX ,请使用[neuronex:3.x.x-python](../installation/docker.md#docker-容器-python-运行环境)类型的 NeuronEX 镜像,该镜像已经包含了 Python3 环境,以及一些相关的函数库。 ## pysam插件整体介绍 diff --git a/zh_CN/streaming-processing/redis.md b/zh_CN/streaming-processing/redis.md index 3d37084..2422ca6 100644 --- a/zh_CN/streaming-processing/redis.md +++ b/zh_CN/streaming-processing/redis.md @@ -25,5 +25,3 @@ Redis 源支持查询表。登录 NeuronEX,点击**数据流处理** -> **源 - 如选择 delimited,还应配置分隔符,如 "," - **主键**:指定主键。 - -您也可选择通过文本模式进行配置,通过 SQL 定义。 \ No newline at end of file diff --git a/zh_CN/streaming-processing/rule_pipeline.md b/zh_CN/streaming-processing/rule_pipeline.md index 0a2fe6f..4636405 100644 --- a/zh_CN/streaming-processing/rule_pipeline.md +++ b/zh_CN/streaming-processing/rule_pipeline.md @@ -1,10 +1,10 @@ -# 规则管道 +# 规则流水线 -我们可以通过将先前规则的结果导入后续规则来形成规则管道。 这可以通过使用中间存储或 MQ(例如 mqtt 消息服务器)来实现。 通过同时使用内存作为 [源](./memory.md) 和 [Sink 目标](./sink/memory.md),我们可以创建没有外部依赖的规则管道。 +我们可以通过将先前规则的结果导入后续规则来形成规则流水线。通过使用[内存](./memory.md)作为 **数据源** 和 **动作(Sink)**,我们可以创建规则流水线。 ## 使用示例 -规则管道将是隐式的。 每个规则都可以使用一个内存目标/源。 这意味着每个步骤将使用现有的 api 单独创建(示例如下所示)。 +规则流水线将是隐式的。 每个规则都可以使用一个内存目标/源。 这意味着每个步骤将使用现有的 api 单独创建(示例如下所示)。 ```shell #1 创建源流 @@ -50,7 +50,7 @@ ``` -通过使用内存主题作为桥梁,我们现在创建一个规则管道:`rule1->{rule2-1, rule2-2}`。 管道可以是多对多的,而且非常灵活。 +通过使用内存主题作为桥梁,我们现在创建一个规则流水线:`rule1->{rule2-1, rule2-2}`。 流水线可以是多对多的,而且非常灵活。 请注意,内存目标可以与其他目标一起使用,为一个规则创建多个规则动作。 并且内存源主题可以使用通配符订阅过滤后的主题列表。 diff --git a/zh_CN/streaming-processing/rule_status.md b/zh_CN/streaming-processing/rule_status.md new file mode 100644 index 0000000..162f7d5 --- /dev/null +++ b/zh_CN/streaming-processing/rule_status.md @@ -0,0 +1,77 @@ +# 规则管理 + +我们可以在规则页面对规则进行管理,包括对规则进行操作、导入规则、查看规则状态信息等。 + +## 查看规则信息 + +- **ID** + + 规则的唯一标识符,由用户自定义。 + +- **名称** + + 对规则的描述,由用户自定义。 + +- **状态** + + 规则的运行状态,包括 `运行中`、`已停止`。 + +- **告警次数** + + 规则运行过程中的告警次数。 + +- **最后一次告警** + + 规则运行过程中最后一次的告警信息。 + +- **操作** + + 包括规则的`编辑`、 `启停`、`重启`、`拓扑`、`复制`及`删除`。 + +pysam + + + +## 了解规则运行的状态指标 + +当一条规则在 NeuronEX 中运行后,我们可以通过规则指标来了解到当前的规则运行状态。点击规则的 `状态` 即可在右侧边栏查看规则的状态信息,可以获取到以下内容: + +```json +{ + "status": "running", + "source_demo_0_records_in_total": 0, + "source_demo_0_records_out_total": 0, + ...... + "op_2_project_0_records_in_total": 0, + "op_2_project_0_records_out_total": 0, + ...... + "sink_mqtt_0_0_records_in_total": 0, + "sink_mqtt_0_0_records_out_total": 0, + ...... +} +``` + +其中 `status` 代表了 rule 当前的运行状态,`running` 代表规则正在运行。 + +而之后的监控项则代表了规则运行过程中,各个算子的运行情况,其监控项构成则为 `算子类型_算子信息_算子索引_具体监控项`。 + +以 `source_demo_0_records_in_total` 为例,其中 `source` 代表了读数据算子,`demo` 为对应的 stream,`0` 代表了该算子实例在并发度中的索引,而 `records_in_total` 则诠释了实际的监控项,即该算子接收了多少条记录。 + +当我们尝试往该 stream 发送了一条数据后,再次获取该规则状态如下: + +```json +{ + "status": "running", + "source_demo_0_records_in_total": 1, + "source_demo_0_records_out_total": 1, + ...... + "op_2_project_0_records_in_total": 1, + "op_2_project_0_records_out_total": 1, + ...... + "sink_mqtt_0_0_records_in_total": 1, + "sink_mqtt_0_0_records_out_total": 1, + ...... +} +``` + +可以看到每个算子的 `records_in_total` 与 `records_out_total` 都由 0 变为了 1,代表了该算子接收到了一条记录,并向下一个算子传递了一条记录,最终在 `sink` 端向 sink 写入了 1 条记录。 diff --git a/zh_CN/streaming-processing/rules.md b/zh_CN/streaming-processing/rules.md index 4f0b1bb..d225b43 100644 --- a/zh_CN/streaming-processing/rules.md +++ b/zh_CN/streaming-processing/rules.md @@ -1,25 +1,20 @@ # 规则 -在 NeuronEX,计算工作以规则的形式呈现。规则以流的数据源为输入,通过 SQL 定义计算逻辑,将结果输出到动作/sink 中。规则定义提交后,它将持续运行。它将不断从源获取数据,根据 SQL 逻辑进行计算,并根据结果触发行动。 +在 NeuronEX 中通过 **规则(Rule)** 处理所有计算逻辑。规则以数据源作为输入,通过 **SQL** 定义计算逻辑,将结果输出到 **动作(Sink)** 中。规则定义提交后,它将持续运行。它将不断从源获取数据,根据 SQL 逻辑进行计算,并根据结果实时触发 **动作(Sink)**。 + +NeuronEX 支持同时运行多个规则。这些规则在同一个内存空间中运行,共享相同的硬件资源。多条并行规则在运行时上是分开的,某条规则的错误不会影响其他规则。 ::: tip -目前,NeuronEX 只支持流处理规则,因此至少有一个规则源应该是连续流。关于如何创建流,可参考 [数据流 - Stream](./stream.md)。 +NeuronEX 规则 SQL 中至少有一个数据源应该是`流(Stream)`类型。关于如何创建流,可参考 [数据流 - Stream](./stream.md)。 ::: -此外,NeuronEX 支持同时运行多个规则。这些规则在同一个内存空间中运行,共享相同的硬件资源。多条并行规则在运行时上是分开的,某条规则的错误不会影响其他规则。 - -## [规则管道](./rule_pipeline.md) - -多个规则可以通过指定 sink /源的联合点形成一个处理管道。例如,第一条规则在内存 sink 中产生结果,其他规则在其内存源中订阅该主题。 - -NeuronEX 支持通过批量导入规则,或通过 Web 界面创建规则。 ## 创建规则 -在 NeuronEX Web 端界面,点击**数据流处理** -> **规则**。点击**创建规则**按钮,即可通过图形化的方式创建规则: +在 NeuronEX Web 端界面,点击**数据流处理** -> **规则**。点击**创建规则**按钮: - 规则 ID:输入规则 ID,规则 id 在同一 NeuronEX 实例中必须唯一。 @@ -37,71 +32,58 @@ NeuronEX 支持通过批量导入规则,或通过 Web 界面创建规则。 ## 规则 SQL -通过指定 `sql` 和 `actions` 属性,以声明的方式定义规则的业务逻辑。其中,`sql` 定义了针对预定义流运行的 SQL 查询,这将转换数据。然后,输出的数据可以通过 `action` 路由到多个位置。 +规则`sql` 中定义了要处理的流或表,以及如何处理。规则 SQL 是,并将处理结果发送到一个或多个动作(Sink)。规则`sql` 中可以使用内置函数和运算符,也可以使用自定义函数和算法。 -最简单的规则 SQL 如 `SELECT * FROM demo`。它有类似于 ANSI SQL 的语法,并且可以利用 NeuronEX 运行时提供的丰富的运算符和函数。参见 [SQL](https://ekuiper.org/docs/zh/latest/sqls/functions/overview.html) 获取更多 SQL 信息。 +最简单的规则 SQL 如 `SELECT * FROM neuronStream`,这条规则会从 `neuronStream` 数据流中获取所有数据。 NeuronEX 提供的丰富的运算符和函数,更多用法请参见 [SQL](./sqls/overview.md)章节 。 -大部分的 SQL 子句都是定义逻辑的,除了负责指定流的 `FROM` 子句。在这个例子中,`demo` 是一个流。通过使用连接子句,可以有多个流或流/表。作为一个流引擎,一个规则中必须至少有一个流。 +## 添加 动作(Sink) -因此,这里的 SQL 查询语句实际上定义了两个部分。 - -- 要处理的流或表。 -- 如何处理。 - -## 添加 Sink - -动作部分定义了一个规则的输出行为。每个规则可以有多个动作。一个动作是一个 sink 连接器的实例。当定义动作时,键是 sink 连接器的类型名称,而值是其属性。 +动作(Sink)部分定义了一个规则的输出行为。每个规则可以有多个动作。 在**动作**区域,点击**添加**按钮。 -1. 选择 Sink。Sink 用来向外部系统写入数据,分为内置动作和扩展动作两类。您可前往 [Sink](./sink/sink.md) 页面获取详细的配置信息。 +1. 选择动作。动作用来向外部系统写入数据,您可前往 [动作(Sink)](./sink/sink.md) 页面获取详细的配置信息。 目前 NeuronEX 内置的 Sink 列表: - [MQTT sink](./sink/mqtt.md):输出到外部 MQTT 服务。 - - [Neuron sink](./sink/neuron.md):输出到本地的 Neuron 实例。 + - [Neuron sink](./sink/neuron.md):输出到 NeuronEX 数采模块。 - [Rest sink](./sink/rest.md):输出到外部 HTTP 服务器。 - - [Redis sink](./sink/redis.md): 写入 Redis。 - - [文件 sink](./sink/file.md): 写入文件。 - - [内存 sink](./sink/memory.md):输出到内存主题以形成规则管道。 + - [内存 sink](./sink/memory.md):输出到内存主题以形成规则流水线。 - [Log sink](./sink/log.md):写入日志,通常只用于调试。 + - [SQL sink](./sink/file.md): 写入关系型数据库Mysql、Sqlserver、Sqlite等。 + - [InfluxDB sink](./sink/memory.md):写入InfluxDB数据库。 + - [InfluxDB V2 sink](./sink/log.md):写入InfluxDB数据库。 + - [文件 sink](./sink/file.md): 写入文件。 - [Nop sink](./sink/nop.md):不输出,用于性能测试。 -2. Resource ID:选择资源 ID 以实现配置的复用。 -3. 是否忽略输出:可选值 True、False,默认为 False,则忽略输出。 -4. 将结果数据按条发送: - - 默认为 false,输出格式为 `{"result":"${the string of received message}"}`, 例如,`{"result":"[{"count":30},""count":20}]"}` - - 如设为 true,结果消息将与实际字段名称一一对应发送。 对于上述示例,它将先发送 `{"count":30}`,再发送 `{"count":20}` -5. 流格式:支持 json、binary、protobuf、delimited、custom。 - - 如选择 protobuf 或 custom,还应配置对应的[模式和模式消息](./config.md#模式) - - 如选择 delimited,还应配置分隔符,如 "," -6. 数据模版:Golang 模板,用于指定输出数据格式。如不指定数据模板,则将数据作为原始输入。关于数据模版的详细介绍,见 [数据模版](./sink/data_template.md) -7. 此外,您可点击展开**高级**部分实现更加定制化的设置。 - - 线程数:设置运行的线程数。该参数值大于 1 时,消息发出的顺序可能无法保证。 - - 缓存大小:设置可缓存消息数目。若缓存消息数超过此限制,sink 将阻塞消息接收,直到缓存消息数目小于限制为止。 - - 是否启用缓存:设置是否启用缓存,可选值 True、False - - 停止时是否清理缓存:设置停止时是否清理缓存,可选值 True、False - - 内存缓存阈值:内存中缓存的最大消息数。 - - 最大磁盘缓存:缓存在磁盘中的最大消息数。 - - 缓冲区页面大小:缓冲区页的消息数,单位为批量读/写磁盘,防止频繁 IO。 - - 重发间隔:重新发送缓存消息的时间间隔(毫秒)。 - - 是否异步运行:设置是否异步运行输出操作以提升性能。请注意,异步运行时,将无法保证输出结果的顺序。 + + + 完成设置后,点击**测试连接**,确认设置后,点击**提交**完成动作的创建。 -## 规则选项 +## 规则选项 (可选) 点开**选项**部分,可继续对当前规则进行配置: -- 是否使用事件时间:是否使用事件时间作为的时间戳。 如使用事件时间,则将从有效负载中提取时间戳。有关时间戳的配置,见 [流 - 时间戳](./stream.md#时间戳与时间戳格式)。 -- 是否发送元数据:指定是否将事件的元数据发送到目标,可选值 True、False -- 延迟多少毫秒:在元素延迟到达的情况,指定在删除元素之前可以延迟多少时间(单位为 ms)。 默认为 0,表示后期元素将被删除。 -- 线程数:一条规则运行时会根据 sql 语句分解成多个 plan 运行。该参数设置每个 plan 运行的线程数。该参数值大于 1 时,消息处理顺序可能无法保证。 -- 缓存大小:指定每个 plan 可缓存消息数。若缓存消息数超过此限制,plan 将阻塞消息接收,直到缓存消息数目小于限制为止。此选项值越大,消息吞吐能力越强,但内存占用也会越多。 -- 流的 QoS:指定流的 QoS。 值:0、1、2 -- 检查点间隔毫秒数:指定触发检查点的时间间隔(单位为 ms)。 该设置仅在 qos 大于 0 时发挥作用。 -- 最大重试次数:最大重试次数。如果设置为 0,该规则将立即失败,不会进行重试 -- 重试间隔时间:默认的重试间隔时间,单位为毫秒。如果没有设置**重试间隔时间系数**(`multiplier`),重试的时间间隔将固定为这个值。 -- 重试的最大间隔时间:重试的最大间隔时间,单位为毫秒。仅在同时设置**重试间隔时间系数**(`multiplier`)时生效。 -- 重试间隔时间系数:重试间隔时间的乘数。 -- 随机值系数:添加或减去延迟的随机值系数,防止在同一时间重新启动多个规则。 +| 选项名 | 类型和默认值 | 说明 | +|--------------------|------------|------------------------------------------------------------------------------------------------| +| 是否开启Debug日志 | bool:false | 指定该条规则是否开启 Debug Level 的日志水平,缺省情况下会继承全局配置中的 Debug 配置参数。 | +| 线程数 | int: 1 | 一条规则运行时会根据 sql 语句分解成多个 plan 运行。该参数设置每个 plan 运行的线程数。该参数值大于1时,消息处理顺序可能无法保证。 | +| 缓存大小 | int: 1024 | 指定每个 plan 可缓存消息数。若缓存消息数超过此限制,plan 将阻塞消息接收,直到缓存消息被消费使得缓存消息数目小于限制为止。此选项值越大,则消息吞吐能力越强,但是内存占用也会越多。 | +| 是否发送元数据 | bool:false | 指定是否将事件的元数据发送到目标。 如果为 true,则目标可以获取元数据信息。 | +| 是否使用事件时间 | bool:false | 使用事件时间还是将时间用作事件的时间戳。 如果使用事件时间,则将从有效负载中提取时间戳。 必须通过**数据流**定义指定时间戳。 | +| 事件时间延迟 | int64:0 | 在使用事件时间窗口时,可能会出现元素延迟到达的情况。 LateTolerance 可以指定在删除元素之前可以延迟多少时间(单位为 ms)。 默认情况下,该值为0,表示后期元素将被删除。 | +| 流的QoS | int:0 | 指定流的 qos。 值为0对应最多一次; 1对应至少一次,2对应恰好一次。 如果 qos 大于0,将激活检查点机制以定期保存状态,以便可以从错误中恢复规则。 | +| 检查点间隔 | int:300000 | 指定触发检查点的时间间隔(单位为 ms)。 仅当 qos 大于0时才有效。 | +| 最大重试次数 | int: 0 | 最大重试次数。如果设置为0,该规则将立即失败,不会进行重试。 | +| 重试间隔时间 | int: 1000 | 默认的重试间隔时间,以毫秒为单位。如果没有设置 `multiplier`,重试的时间间隔将固定为这个值。 | +| 重试最大间隔时间 | int: 30000 | 重试的最大间隔时间,单位是毫秒。只有当 `multiplier` 有设置时,从而使得每次重试的延迟都会增加时才会生效。 | +| 重试间隔时间乘数 | float: 2 | 重试间隔时间的乘数。 | +| 随机值系数 | float: 0.1 | 添加或减去延迟的随机值系数,防止在同一时间重新启动多个规则。 | + + +:::tip 提示 +大多数场景下,规则选项采用默认值即可。 +::: 完成设置后,点击**提交**,完成当前规则的创建。新建规则将出现在规则列表中。您可在此查看规则状态、编辑规则、停止规则、刷新规则、查看规则拓扑图,复制规则或删除规则。 @@ -124,6 +106,15 @@ NeuronEX 支持通过批量导入规则,或通过 Web 界面创建规则。 在 NeuronEX Web 端界面,点击**数据流处理** -> **规则**,点击**导出规则**按钮,将会导出当前规则的 JSON 文件。 +## [规则状态](./rule_status.md) + +当一条规则在 NeuronEX 中运行后,我们可以通过规则指标来了解到当前的规则运行状态。详情请参见[规则状态](./rule_status.md)。 + +## [规则流水线](./rule_pipeline.md) + +多个规则可以通过指定 sink /源的联合点形成一个处理管道。例如,第一条规则在内存 sink 中产生结果,其他规则在其内存源中订阅该主题。更多用法请参见[规则流水线](./rule_pipeline.md)章节 。 + + -- **线程数**:设置运行的线程数。该参数值大于 1 时,消息发出的顺序可能无法保证。 -- **缓存大小**:设置可缓存消息数目。若缓存消息数超过此限制,sink 将阻塞消息接收,直到缓存消息数目小于限制为止。 -- **是否启用缓存**:设置是否启用缓存,可选值 True、False -- **停止时是否清理缓存**:设置停止时是否清理缓存,可选值 True、False -- **内存缓存阈值**:内存中缓存的最大消息数。 -- **最大磁盘缓存**:缓存在磁盘中的最大消息数。 -- **缓冲区页面大小**:缓冲区页的消息数,单位为批量读/写磁盘,防止频繁 IO。 -- **重发间隔**:重新发送缓存消息的时间间隔(毫秒)。 -- **是否异步运行**:设置是否异步运行输出操作以提升性能。请注意,异步运行时,将无法保证输出结果的顺序。 - 完成设置后,可点击**测试连接**确认连接情况。最后点击**提交**,完成设置。 - ## 示例 以下为使用 SAS 连接到 Azure IoT Hub 的样例。 diff --git a/zh_CN/streaming-processing/sink/neuron.md b/zh_CN/streaming-processing/sink/neuron.md index f8d0cfa..c01a91e 100644 --- a/zh_CN/streaming-processing/sink/neuron.md +++ b/zh_CN/streaming-processing/sink/neuron.md @@ -1,6 +1,6 @@ # Neuron Sink -该动作用于将结果发送到 NeuronEX 实例的数采引擎模块中以实现设备反控。 +该动作用于将结果发送到 NeuronEX 实例的数采模块中以实现设备反控。 如希望使用 Neuron Sink 连接器,点击 **数据流处理** -> **规则** -> **新建规则**,在 **动作** 区域,点击**添加**,**Sink** 选择 **Neuron**。 @@ -13,46 +13,15 @@ ::: - **名称**:输入名称 -- **路径**:连接 NeuronEX 实例的数采引擎模块的 URL,默认为 `tcp://127.0.0.1:7081` +- **路径**:连接 NeuronEX 实例的数采模块的 URL,默认为 `tcp://127.0.0.1:7081` - **节点名称**:发送到 neuron 的节点名,值可以为动态参数模板。使用非 raw 模式时必须配置此选项。 - **分组名称**:发送到 neuron 的组名,值可以为动态参数模板。使用非 raw 模式时必须配置此选项。 -- **MQTT 客户端标志符(ClientID)**:MQTT 连接的客户端 ID。 如果未指定,将使用一个 uuid。 -- **MQTT 协议版本**:MQTT 协议版本,支持 3.1 或者 3.1.1 ,默认为 3.1。 -- **QoS 级别**:默认订阅 QoS 级别,可选值:0、1、2 -- **用户名**:MQTT 连接用户名。 -- **密码**:MQTT 连接密码。 -- **证书路径**:可以为绝对路径,也可以为相对路径。如果指定的是相对路径,那么父目录为执行 kuiperd 命令的路径。比如,如果你在 `/var/kuiper` 中运行 `bin/kuiperd`,那么父目录为 `/var/kuiper`; 如果运行从 `/var/kuiper/bin` 中运行 `./kuiperd`,那么父目录为 `/var/kuiper/bin` -- **私钥路径**:私钥路径。可以为绝对路径,也可以为相对路径。 -- **根证书路径**:根证书路径,用以验证服务器证书。可以为绝对路径,也可以为相对路径。 -- **跳过证书验证**:控制是否跳过证书认证。如设置为 True,将跳过证书认证;否则进行证书验证。 -- **压缩**:使用指定的方法压缩 MQTT Payload,可选值:zlib、gzip、flate。 -- **是否忽略输出**:可选值 True、False,默认为 False,则忽略输出。 -- **将结果数据按条发送**: - - - 默认为 false,输出格式为 `{"result":"${the string of received message}"}`, 例如,`{"result":"[{"count":30},""count":20}]"}` - - 如设为 true,结果消息将与实际字段名称一一对应发送。 对于上述示例,它将先发送 `{"count":30}`,再发送 `{"count":20}` - -- **流格式**:支持 json、binary、protobuf、delimited、custom。 - - 如选择 protobuf 或 custom,还应配置对应的[模式和模式消息](../config.md) - - 如选择 delimited,还应配置分隔符,如 "," +- **是否忽略输出**:默认为 False。 +- **将结果数据按条发送**:默认为 True。 +- **流格式**:默认为 `json`。 - **数据模版**:Golang 模板,用于指定输出数据格式。如不指定数据模板,则将数据作为原始输入。关于数据模版的详细介绍,见 [数据模版](./data_template.md) -## 通用配置 - -您可点击展开**高级**部分实现更加定制化的设置。 - -- **连接器**:重用到 MQTT Broker 的连接,具体可参考 [MQTT 源](../mqtt.md)。 -- **线程数**:设置运行的线程数。该参数值大于 1 时,消息发出的顺序可能无法保证。 -- **缓存大小**:设置可缓存消息数目。若缓存消息数超过此限制,sink 将阻塞消息接收,直到缓存消息数目小于限制为止。 -- **是否启用缓存**:设置是否启用缓存,可选值 True、False -- **停止时是否清理缓存**:设置停止时是否清理缓存,可选值 True、False -- **内存缓存阈值**:内存中缓存的最大消息数。 -- **最大磁盘缓存**:缓存在磁盘中的最大消息数。 -- **缓冲区页面大小**:缓冲区页的消息数,单位为批量读/写磁盘,防止频繁 IO。 -- **重发间隔**:重新发送缓存消息的时间间隔(毫秒)。 -- **是否异步运行**:设置是否异步运行输出操作以提升性能。请注意,异步运行时,将无法保证输出结果的顺序。 - 完成设置后,可点击**测试连接**确认连接情况。最后点击**提交**,完成设置。 ## 示例 diff --git a/zh_CN/streaming-processing/sink/nop.md b/zh_CN/streaming-processing/sink/nop.md index 5feb88c..5074ef6 100644 --- a/zh_CN/streaming-processing/sink/nop.md +++ b/zh_CN/streaming-processing/sink/nop.md @@ -14,28 +14,9 @@ - **名称**:输入名称 - **打印日志**:是否将结果打印到日志,默认为 false,即不打印到日志文件。 -- **是否忽略输出**:可选值 True、False,默认为 False,则忽略输出。 -- **将结果数据按条发送**: - - - 默认为 false,输出格式为 `{"result":"${the string of received message}"}`, 例如,`{"result":"[{"count":30},""count":20}]"}` - - 如设为 true,结果消息将与实际字段名称一一对应发送。 对于上述示例,它将先发送 `{"count":30}`,再发送 `{"count":20}` -- **流格式**:支持 json、binary、protobuf、delimited、custom。 - - 如选择 protobuf 或 custom,还应配置对应的[模式和模式消息](../config.md) - - 如选择 delimited,还应配置分隔符,如 "," +- **是否忽略输出**:默认为 False。 +- **将结果数据按条发送**:默认为 True。 +- **流格式**:默认为 `json`。 - **数据模版**:Golang 模板,用于指定输出数据格式。如不指定数据模板,则将数据作为原始输入。关于数据模版的详细介绍,见 [数据模版](./data_template.md) -## 通用配置 - -您可点击展开**高级**部分实现更加定制化的设置。 - -- **线程数**:设置运行的线程数。该参数值大于 1 时,消息发出的顺序可能无法保证。 -- **缓存大小**:设置可缓存消息数目。若缓存消息数超过此限制,sink 将阻塞消息接收,直到缓存消息数目小于限制为止。 -- **是否启用缓存**:设置是否启用缓存,可选值 True、False -- **停止时是否清理缓存**:设置停止时是否清理缓存,可选值 True、False -- **内存缓存阈值**:内存中缓存的最大消息数。 -- **最大磁盘缓存**:缓存在磁盘中的最大消息数。 -- **缓冲区页面大小**:缓冲区页的消息数,单位为批量读/写磁盘,防止频繁 IO。 -- **重发间隔**:重新发送缓存消息的时间间隔(毫秒)。 -- **是否异步运行**:设置是否异步运行输出操作以提升性能。请注意,异步运行时,将无法保证输出结果的顺序。 - 完成设置后,可点击**测试连接**确认连接情况。最后点击**提交**,完成设置。 diff --git a/zh_CN/streaming-processing/sink/redis.md b/zh_CN/streaming-processing/sink/redis.md index fab438a..18e4b05 100644 --- a/zh_CN/streaming-processing/sink/redis.md +++ b/zh_CN/streaming-processing/sink/redis.md @@ -1,5 +1,7 @@ # Redis 目标(Sink) +updatable + 如希望使用 Redis Sink 连接器,点击 **数据流处理** -> **规则** -> **新建规则**,在 **动作** 区域,点击**添加**,**Sink** 选择 **redis**。 ## 传输与存储配置 @@ -17,29 +19,11 @@ - **Key**:设置 Redis 的键值(key),用户只需配置 **Key** 或 **Key 字段**,推荐通过 **Key 字段** 配置 Redis 数据的键值(key)。 - **Key 字段**:使用 json 属性为 Redis 的键值(key),例如,**Key 字段**设为 `deviceName`,收到信息为 `{“deviceName":"abc"}`,那存入 Redis 用的键值(Key)即 `abc`。注意:该属性(如 `deviceName`)应已定义且为 string 类型。如属性未定义,那么该属性将被直接作为新的 key 值进行处理。注意:如设置了 **Key 字段**,则无需再配置**数据模版**。 - **数据类型**:选择 string 或者 list,默认为 string。注意:修改类型后,需首先在 Redis 中删除原有的 key,修改才会生效。 -- **是否忽略输出**:可选值 True、False,默认为 False,则忽略输出。 -- **将结果数据按条发送**: - - - 默认为 false,输出格式为 `{"result":"${the string of received message}"}`, 例如,`{"result":"[{"count":30},""count":20}]"}` - - 如设为 true,结果消息将与实际字段名称一一对应发送。 对于上述示例,它将先发送 `{"count":30}`,再发送 `{"count":20}` -- **流格式**:支持 json、binary、protobuf、delimited、custom。 - - 如选择 protobuf 或 custom,还应配置对应的[模式和模式消息](../config.md) - - 如选择 delimited,还应配置分隔符,如 "," -- **数据模版**:Golang 模板,用于指定输出数据格式。如不指定数据模板,则将数据作为原始输入。关于数据模版的详细介绍,见 [数据模版](./data_template.md)。注意:如使用 **Key 字段**配置 Redis 的键值(key),则无需配置数据模版。 - -## 通用配置 - -您可点击展开**高级**部分实现更加定制化的设置。 +- **是否忽略输出**:默认为 False。 +- **将结果数据按条发送**:默认为 True。 +- **流格式**:默认为 `json`。 +- **数据模版**:Golang 模板,用于指定输出数据格式。如不指定数据模板,则将数据作为原始输入。关于数据模版的详细介绍,见 [数据模版](./data_template.md) -- **线程数**:设置运行的线程数。该参数值大于 1 时,消息发出的顺序可能无法保证。 -- **缓存大小**:设置可缓存消息数目。若缓存消息数超过此限制,sink 将阻塞消息接收,直到缓存消息数目小于限制为止。 -- **启用缓存**:设置是否启用缓存,可选值 True、False -- **停止时清理缓存**:设置停止时是否清理缓存,可选值 True、False -- **内存缓存阈值**:内存中缓存的最大消息数。 -- **最大磁盘缓存**:缓存在磁盘中的最大消息数。 -- **缓冲区页面大小**:缓冲区页的消息数,单位为批量读/写磁盘,防止频繁 IO。 -- **重发间隔**:重新发送缓存消息的时间间隔(毫秒)。 -- **是否异步运行**:设置是否异步运行输出操作以提升性能。请注意,异步运行时,将无法保证输出结果的顺序。 完成设置后,可点击**测试连接**确认连接情况。最后点击**提交**,完成设置。 diff --git a/zh_CN/streaming-processing/sink/rest.md b/zh_CN/streaming-processing/sink/rest.md index 7e57265..b2a011a 100644 --- a/zh_CN/streaming-processing/sink/rest.md +++ b/zh_CN/streaming-processing/sink/rest.md @@ -21,7 +21,7 @@ - 如**消息体类型**设为 `html`,`xml` 和 `javascript`,还应在**数据模版**处进行相应配置。 - **超时**:HTTP 请求超时的时间(毫秒),默认为 5000 毫秒 -- **HTTP 头**:HTTP 请求设置的其它 HTTP 头。支持动态获取。您可在可视化模式下添加键值对,或在文本模式下直接添加,例如 `{"key":"{{value}}"}` +- **HTTP 头**:HTTP 请求设置的其它 HTTP 头。支持动态获取。 - **证书路径**:可以为绝对路径,也可以为相对路径。如果指定的是相对路径,那么父目录为执行 kuiperd 命令的路径。比如,如果你在 `/var/kuiper` 中运行 `bin/kuiperd`,那么父目录为 `/var/kuiper`; 如果运行从 `/var/kuiper/bin` 中运行 `./kuiperd`,那么父目录为 `/var/kuiper/bin` - **私钥路径**:私钥路径。可以为绝对路径,也可以为相对路径。 - **根证书路径**:根证书路径,用以验证服务器证书。可以为绝对路径,也可以为相对路径。 @@ -40,29 +40,11 @@ - url:刷新令牌的网址,总是使用 POST 方式请求。 - headers:用于刷新令牌的请求头。通常把令牌放在这里,用于授权。 - body:刷新令牌的请求主体。当使用头文件来传递刷新令牌时,可能不需要配置此选项。 -- **是否忽略输出**:可选值 True、False,默认为 False,则忽略输出。 -- **将结果数据按条发送**: - - 默认为 false,输出格式为 `{"result":"${the string of received message}"}`, 例如,`{"result":"[{"count":30},""count":20}]"}` - - 如设为 true,结果消息将与实际字段名称一一对应发送。 对于上述示例,它将先发送 `{"count":30}`,再发送 `{"count":20}` -- **流格式**:支持 json、binary、protobuf、delimited、custom。 - - 如选择 protobuf 或 custom,还应配置对应的[模式和模式消息](../config.md) - - 如选择 delimited,还应配置分隔符,如 "," +- **是否忽略输出**:默认为 False。 +- **将结果数据按条发送**:默认为 True。 +- **流格式**:默认为 `json`。 - **数据模版**:Golang 模板,用于指定输出数据格式。如不指定数据模板,则将数据作为原始输入。关于数据模版的详细介绍,见 [数据模版](./data_template.md) -## 通用配置 - -您可点击展开**高级**部分实现更加定制化的设置。 - -- **线程数**:设置运行的线程数。该参数值大于 1 时,消息发出的顺序可能无法保证。 -- **缓存大小**:设置可缓存消息数目。若缓存消息数超过此限制,sink 将阻塞消息接收,直到缓存消息数目小于限制为止。 -- **启用缓存**:设置是否启用缓存,可选值 True、False -- **停止时清理缓存**:设置停止时是否清理缓存,可选值 True、False -- **内存缓存阈值**:内存中缓存的最大消息数。 -- **最大磁盘缓存**:缓存在磁盘中的最大消息数。 -- **缓冲区页面大小**:缓冲区页的消息数,单位为批量读/写磁盘,防止频繁 IO。 -- **重发间隔**:重新发送缓存消息的时间间隔(毫秒)。 -- **是否异步运行**:设置是否异步运行输出操作以提升性能。请注意,异步运行时,将无法保证输出结果的顺序。 - 完成设置后,可点击**测试连接**确认连接情况。最后点击**提交**,完成设置。 ## 数据格式 diff --git a/zh_CN/streaming-processing/sink/sink.md b/zh_CN/streaming-processing/sink/sink.md index 6dc5fd8..199d8de 100644 --- a/zh_CN/streaming-processing/sink/sink.md +++ b/zh_CN/streaming-processing/sink/sink.md @@ -7,21 +7,72 @@ 用户可以直接使用 NeuronEX 的内置动作。动作类型的列表如下: - [MQTT](./mqtt.md):输出到外部 MQTT 服务。 -- [Neuron](./neuron.md):输出到NeuronEX的数采引擎模块。 +- [Neuron](./neuron.md):输出到NeuronEX的数采模块。 - [REST](./rest.md):输出到外部 HTTP 服务器。 -- [Redis](./redis.md): 写入 Redis。 -- [文件](./file.md): 写入文件。 -- [内存](./memory.md):输出到内存主题以形成规则管道。 +- [内存](./memory.md):输出到内存主题以形成规则流水线。 - [Log](./log.md):写入日志,通常只用于调试。 -- [Nop](./nop.md):不输出,用于性能测试。 - [SQL](./sql.md):写入 SQL 数据库。 - [InfluxDB](./influx.md): 写入 InfluxDB `v1.x`。 - [InfluxDB V2](./influx2.md): 写入 InfluxDB `v2.x`。 -- [Kafka](./kafka.md):输出到 Kafka 。 +- [文件](./file.md): 写入文件。 +- [Nop](./nop.md):不输出,用于性能测试。 + + -## 公共属性 +## 动作公共参数配置 + +- **Resource ID** + + 选择资源 ID 以实现配置的复用。 + + 像数据源一样,动作 (Sink) 也支持配置复用,您可在**数据流处理** -> **配置** -> **资源** 的传输与存储模版页签查看已添加的配置,编辑或删除配置组,也可点击右上角的添加传输与存储模版按钮进行添加。 + +- **是否忽略输出** + + 默认为 false。 + - `是否忽略输出`为 `true`,当规则 SQL的处理结果为空时,则忽略输出。 + - `是否忽略输出`为 `false`,当规则 SQL的处理结果为空时,输出`{}`。 + +- **将结果数据按条发送** + + - 默认为 true,结果消息将一一对应发送,输出格式一般为`JSON` 格式。 + - 如设为 false,结果消息将根据规则 SQL 的处理结果组合发送。输出格式一般为包裹 JSON 的`数组` 格式。 + +- **流格式** + + 用于定义传入的数据类型,支持 `json`、`protobuf`、`binary`、`delimited` 和 `custom`,默认为 `JSON` 。以下为其中部分流格式的介绍: + + - delimited + + 对于 CSV 文件数据源,需选择 `delimited` 格式,还应指定分隔符来区分数据字段,如 "`,`" + + - protobuf + + Protobuf 是一种序列化结构数据的方式,当流格式设置为 `protobuf` 时,还应配置解码时使用的模式。模式可在 **数据流处理** -> **配置** -> **模式**中定义。有关模式的详细介绍,请查阅 [模式](../config.md#模式) 章节。 + + - Binary + + 对于二进制数据流,例如图像或者视频流,需要指定数据格式为 `binary` 。 + + - custom + + `custom` 是由用户自定义的数据格式 。 + + +- **数据模版** + + 数据模板支持用户对分析结果进行"二次处理",以满足不同接收系统的多样化格式要求。利用 Golang 模板系统,提供了动态数据转换、条件输出和迭代处理的机制,确保了与各种接收器的兼容性和精确格式化。 + 关于数据模版的详细介绍,请查阅 [数据模版](./data_template.md) 章节。 + + :::tip 提示 + **数据模版**为可选配置。如不指定数据模板,则将规则处理的结果正常输出。 + ::: + + + + -## 数据模板 -[数据模板](data_template.md) 支持用户对分析结果进行"二次处理",以满足不同接收系统的多样化格式要求。利用 Golang 模板系统,提供了动态数据转换、条件输出和迭代处理的机制,确保了与各种接收器的兼容性和精确格式化。 ## 动态属性 @@ -70,15 +119,25 @@ 需要注意的是,上例中的 `sendSingle` 属性已设置。在默认情况下,目标接收到的是数组,使用的 jsonpath 需要采用 {{index . 0 "topic"}}。 -## 缓存 +## 高级配置 + +您可点击展开**高级**部分实现更加定制化的设置。 -动作用于将处理结果发送到外部系统中,但在诸如弱网情况下,边到云的网络连接不稳定,因此针对外部系统不可用的情况,NeuronEX 中的动作提供了缓存功能,用于在发送错误的情况下暂存数据,并在错误恢复之后自动重发缓存数据。动作的缓存可分为内存和磁盘两级存储。用户可配置内存缓存条数,超过上限后,新的缓存将离线存储到磁盘中,通过同时利用内存和磁盘空间,实现更大的缓存容量;此外,动作还将持续检测故障恢复状态,支持在不重启规则的情况下重新发送。 + +- **线程数**:设置运行的线程数。该参数值大于 1 时,消息发出的顺序可能无法保证。 +- **缓存大小**:设置可缓存消息数目。若缓存消息数超过此限制,sink 将阻塞消息接收,直到缓存消息数目小于限制为止。 +- **是否启用缓存**:设置是否启用缓存,可选值 True、False +- **停止时是否清理缓存**:设置停止时是否清理缓存,可选值 True、False +- **内存缓存阈值**:内存中缓存的最大消息数。 +- **最大磁盘缓存**:缓存在磁盘中的最大消息数。 +- **缓冲区页面大小**:缓冲区页的消息数,单位为批量读/写磁盘,防止频繁 IO。 +- **重发间隔**:重新发送缓存消息的时间间隔(毫秒)。 -每个动作 (Sink) 将有一个唯一的 sqlite 表来保存缓存。缓存的计数添加到动作 (Sink) 指标中的 buffer length 部分。 -### 流程 +## 数据缓存 + +NeuronEX 中的动作提供了缓存功能,用于在发送错误的情况下暂存数据,并在错误恢复之后自动重发缓存数据。动作的缓存可分为内存和磁盘两级存储。用户可配置内存缓存条数,超过上限后,新的缓存将离线存储到磁盘中,通过同时利用内存和磁盘空间,实现更大的缓存容量;此外,动作还将持续检测故障恢复状态,支持在不重启规则的情况下重新发送。 -虽然每个 sink 都可以配置自己的缓存机制,但整体而言,启用缓存后,所有的 sink 事件都会经过两个阶段:首先是将所有内容保存到缓存中;然后在收到 ack 后删除缓存。 - 错误检测:发送失败后,sink 应该通过返回特定的错误类型来识别可恢复的失败(网络等),这将返回一个失败的 ack,并触发缓存。成功发送后,或错误不可恢复的情况下,将发送一个成功的 ack 来删除缓存。 - 缓存机制:缓存将首先被保存在内存中。如果超过了内存的阈值,后面的缓存将被保存到磁盘中。一旦磁盘缓存超过磁盘存储阈值,缓存将开始 rotate,即内存中最早的缓存将被丢弃,并加载磁盘中最早的缓存来代替。 @@ -117,22 +176,18 @@ } ``` -## 传输与存储模版 - -像数据源一样,动作 (Sink) 也支持配置复用,您可在数据流处理 -> 配置 -> 资源 的传输与存储模版页签查看已添加的配置,编辑或删除配置组,也可点击右上角的添加传输与存储模版按钮进行添加。 - ## 更新动作 (Sink) -默认情况下,Sink 将数据附加到外部系统中。一些系统,如内存,支持更新或删除数据。与查找源类似,只有少数 Sink 是天然 "可更新 "的。可更新的 Sink 必须支持插入、更新和删除。产品自带的 Sink 中,可更新的包括: +默认情况下,Sink 将数据附加到外部系统中。一些系统,如内存,支持更新或删除数据。与查找源类似,只有少数 Sink 是`updateble`的。`updateble` Sink 必须支持插入、更新和删除。NeuronEX 内置的`updateble`类型的 Sink 包括: - [内存](./memory.md) -- [Redis](./redis.md) - [SQL](./sql.md) + 为了激活更新功能,Sink 必须设置 `rowkindField` 属性,以指定数据中的哪个字段代表要采取的动作。在下面的例子中,`rowkindField` 被设置为 `action`。 ```json -{"redis": { +{"sql": { "addr": "127.0.0.1:6379", "dataType": "string", "field": "id", diff --git a/zh_CN/streaming-processing/sink/sql.md b/zh_CN/streaming-processing/sink/sql.md index 17edbde..9859f38 100644 --- a/zh_CN/streaming-processing/sink/sql.md +++ b/zh_CN/streaming-processing/sink/sql.md @@ -1,5 +1,7 @@ # SQL 目标(Sink) +updatable + 此插件将结果写入 SQL 数据库。 ## 编译部署插件 diff --git a/zh_CN/streaming-processing/source.md b/zh_CN/streaming-processing/source.md index 6f9d163..4f3cdd8 100644 --- a/zh_CN/streaming-processing/source.md +++ b/zh_CN/streaming-processing/source.md @@ -1,43 +1,51 @@ # 数据源 -源(source)用于从外部系统中读取数据。数据源既可以是无界的流式数据,即流;也可以是有界的批量数据,即表。在规则中使用时,至少有一个源必须为流。源(source)获取数据后,还会根据定义的数据模型进行数据解码和转换。 +数据源(Source)用于从外部系统中读取数据。NeuronEX 支持将数据源加载为三种模式:`流(Stream)`、`扫描表(Scan Table)`、`查询表(Lookup Table)`。 -关于数据加载机制,有两种模式: -- 扫描:像一个由事件驱动的流一样,一个一个地加载数据事件。这种模式的源可以用在流或扫描表中。 -- 查找:在需要时引用外部内容,只用于查找表。 - -每个源将支持一种或两种模式。在源页面上,如果支持该模式,会有一个徽章显示。 +一般的数据源都可以选择加载为`流(Stream)`,表(包括`扫描表`和`查询表`)是保留大量的数据状态的方法,一般用来和`流(Stream)`结合使用,以应对复杂的处理场景。 +:::tip 提示 +在使用`规则(Rule)`时,至少有一个源必须为`流(Stream)`。 +::: ## 数据源类型 -目前 NeuronEX 内置以下数据源类型: +目前 NeuronEX 内置以下数据源类型,以及它们支持的加载模式: | 名称 | 描述 | 流 | 扫描表 | 查询表 | | --------------------------- | ---------------------------------- | ---- | ------ | ------ | +| [Neuron](./neuron.md) | 从NeuronEX的数采模块读取数据 | ✅ | ✅ |❌ | | [MQTT](./mqtt.md) | 从 MQTT 主题读取数据 | ✅ | ✅ | ❌ | | [HTTP pull](./http_pull.md) | 从 HTTP 服务器中拉取数据 | ✅ | ✅ | ❌ | | [HTTP push](./http_push.md) | 通过 HTTP 推送数据到 NeuronEX | ✅ | ✅ | ❌ | -| [内存](./memory.md) | 从 NeuronEX 内存主题读取数据以形成规则管道 | ✅ | ✅ | ✅ | -| [Neuron](./neuron.md) | 从NeuronEX的数采引擎模块读取数据 | ✅ | ✅ |❌ | -| [文件](./file.md) | 从文件中读取数据 | ✅ | ✅ | ❌ | +| [内存](./memory.md) | 从 NeuronEX 内存主题读取数据以形成规则流水线 | ✅ | ✅ | ✅ | | [SQL](./sql.md) | 从数据库中查询数据 | ✅ | ✅ | ✅| +| [文件](./file.md) | 从文件中读取数据 | ✅ | ✅ | ❌ | | [Video](./video.md) | 从视频流中查询数据 | ✅ | ✅ | ❌ | -| [Redis](./redis.md) | 从 Redis 中查询数据 | ❌ | ❌ | ✅ | + ## 定义和运行 -在 NeuronEX 中,定义数据源的流或者表之后,系统实际上只是创建了一个数据源的逻辑定义而非真正物理运行的数据输入。此逻辑定义可在多个规则的 SQL 的 `from` 子句中使用。只有当使用了该定义的规则启动之后,数据流才会真正运行。 +在 NeuronEX 中,在**源管理**页面创建了流或者表之后,实际上只是创建了一个数据源的逻辑定义而非真正物理运行的数据输入。只有当使用了该数据源的规则启动之后,数据流才会真正运行。 -默认情况下,多个规则使用同一个源的情况下,每个规则会启动一个独立的源的运行时,与其他规则中的同名源完全隔离。若多个规则需要使用完全相同的输入数据或者提高性能,源可定义为共享源,从而在多个规则中共享同一个实例。 +:::tip 提示 +在**源管理**页面创建的流或者表,可在多个规则的 SQL 的 `from` 子句中使用。 +::: -## 解码 +用户可以在创建 **数据源(Source)** 时通过指定`共享(SHARED)`字段来定义是否共享数据源。 -用户可以在创建源时通过指定流格式属性来定义解码方式。当前支持 `json`、 `binary`、`protobuf`、`delimited`,如希望使用自定义格式,也可以选择 `custom`。 + +## 数据源解码 + +用户可以在创建 **数据源(Source)** 时通过指定`流格式`字段来定义解码方式。当前支持 `json`、 `binary`、`protobuf`、`delimited`,如希望使用自定义格式,也可以选择 `custom`。 +:::tip 提示 +数据源进入规则处理前,会先进行解码,解码后的数据会作为规则的输入。 +::: ## 数据结构 -用户可以像定义关系数据库表结构一样定义数据源的结构。部分数据格式本身带有数据结构,例如 `protobuf` 格式。用户在创建源时可以定义 **模式名称** 来指向模式注册表 ( Schema Registry ) 中的数据结构定义。 +NeuronEX 支持带结构/无结构的流,默认为无结构。即在 **源管理** -> **创建流** 时,`是否为带结构的流`选项不打勾。详见[流参数配置](./stream.md#流参数配置)。 + + -其中,模式注册表中的定义为物理数据结构,而数据源定义语句中的数据结构为逻辑数据结构。若两者都有定义,则物理数据结构将覆盖逻辑数据结构。此时,数据结构的验证和格式化将有定义的格式负责,例如 `protobuf`。若只定义了逻辑结构而且设定了 `strictValidation`,NeuronEX 在运行时,会根据定义的结构进行数据验证和类型转换。若未设置验证,则逻辑数据结构主要用于编译和加载时的 SQL 语句验证。若输入数据为预处理过的干净数据或者数据结构未知或不固定,用户可不定义数据结构,从而也可以避免数据转换的开销。 diff --git a/zh_CN/streaming-processing/sql.md b/zh_CN/streaming-processing/sql.md index be4f308..0290c98 100644 --- a/zh_CN/streaming-processing/sql.md +++ b/zh_CN/streaming-processing/sql.md @@ -4,133 +4,93 @@ 扫描表 查询表 -源将定期查询数据库以获取数据流。 +NeuronEX 数据处理模块通过 `SQL` 类型的数据源,支持对接`sqlserver`、`postgres`、`mysql`、`sqlite3`和`oracle`等数据库,可以定期查询数据库以获取数据流。 -## 编译和部署插件 +## 创建流 -此插件必须与至少一个数据库驱动程序一起使用。我们使用构建标签来确定将包含哪个驱动程序。[此处](https://github.com/lf-edge/ekuiper/tree/master/extensions/sqldatabase/driver)列出了所有支持的驱动程序。 +登录 NeuronEX,点击**数据流处理** -> **源管理**。在**流管理**页签,点击**创建流**。 -该插件默认支持 `sqlserver\postgres\mysql\sqlite3\oracle` 驱动。用户可以自己编译只支持一个驱动的插件,例如如果他只想要sqlserver,那么他可以用 build tag sqlserver 构建。 +在弹出的**源管理** / **创建**页面,进入如下配置: -### 默认构建命令 +- **流名称**:输入流名称 +- **是否为带结构的流**:勾选确认是否为带结构的流,如为带结构的流,则需进一步添加流字段。可默认不勾选。 +- **流类型**:选择 SQL +- **配置组**:可编辑使用默认配置组,或点击添加配置组,在弹出的对话框中进行如下设置,设置完成后,可点击**测试连接**进行测试: -```shell -# cd $eKuiper_src -# go build -trimpath --buildmode=plugin -o plugins/sources/Sql.so extensions/sources/sql/*.go -# cp plugins/sources/Sql.so $eKuiper_install/plugins/sources -``` - -### Sqlserver 构建命令 - -```shell -# cd $eKuiper_src -# go build -trimpath --buildmode=plugin -tags sqlserver -o plugins/sources/Sql.so extensions/sources/sql/*.go -# cp plugins/sources/Sql.so $eKuiper_install/plugins/sources -``` + - **名称**:输入配置组名称。 + - **数据库地址**:数据库的连接地址,各类数据库的详细配置请参考 [数据库连接地址](#数据库连接地址) 。 + - **间隔时间**:发出查询的时间间隔(毫秒)。 + - **TemplateSql**:SQL 语句模板,详见[SQL 语句模板示例](#sql-语句模板示例)。 + - **indexField**:可选参数,表的哪一列作为索引来记录偏移量。 + - **indexValue**:可选参数,初始索引值,如果用户指定该字段,查询将使用这个初始值作为查询条件,当获得更大的值时将更新下一个查询,需要为数字类型的值。 + - **indexFieldType**:可选参数,索引字段的列类型,如果是 dateTime 类型,必须将该字段设置为 DATETIME。 + - **dateTimeFormat**:可选参数,索引字段的时间格式。 +- **流格式**:默认 json 格式。 +- **共享**:勾选确认是否共享源。 -重启 eKuiper 服务器以激活插件 +### 数据库连接地址 -## 配置 - -这个数据流的配置文件位于 `$ekuiper/etc/sources/sql.yaml`. 格式如下: - -```yaml -default: - interval: 10000 - url: mysql://user:test@140.210.204.147/user?parseTime=true - internalSqlQueryCfg: - table: test - limit: 1 - indexField: registerTime - indexValue: "2022-04-21 10:23:55" - indexFieldType: "DATETIME" - dateTimeFormat: "YYYY-MM-dd HH:mm:ss" - -sqlserver_config: - url: sqlserver://username:password@140.210.204.147/testdb - internalSqlQueryCfg: - table: Student - limit: 10 - indexField: id - indexValue: 1000 - -template_config: - templateSqlQueryCfg: - TemplateSql: "select * from table where entry_data > {{.entry_data}}" - indexField: entry_data - indexValue: "2022-04-13 06:22:32.233" - indexFieldType: "DATETIME" - dateTimeFormat: "YYYY-MM-dd HH:mm:ssSSS" -``` - -### 全局配置 - -用户可以在此处指定全局 sql 源设置。`default` 部分中指定的配置项将在运行此源时作为源的默认设置。 - -### interval - -发出查询的时间间隔(毫秒) - -### url - -目标数据库地址 +数据库连接地址参考: | database | url sample | | ---------- | ----------------------------------------------------- | -| mysql | mysql://user:test@140.210.204.147/user?parseTime=true | -| sql server | sqlserver://username:password@140.210.204.147/testdb | -| postgres | postgres://user:pass@localhost/dbname | -| postgres | postgres://user:pass@localhost/dbname | -| sqlite | sqlite:/path/to/file.db | - -### internalSqlQueryCfg - -* `table`: 要查询的表名 -* `limit`: 需要从结果中获取多少条目 -* `indexField`: 表的哪一列作为索引来记录偏移量 -* `indexValue`: 初始索引值,如果用户指定该字段,查询将使用这个初始值作为查询条件,当获得更大的值时将更新下一个查询 -* `indexFieldType`: 索引字段的列类型,如果是 dateTime 类型,必须将该字段设置为 `DATETIME` -* `dateTimeFormat`: 索引字段的时间格式 - -| table | limit | indexField | indexValue | indexFieldType | dateTimeFormat | sql query statement | -| ------- | ----- | ------------ | --------------------- | -------------- | --------------------- | --------------------------------------------------------------------------------------------------- | -| Student | 10 | | | | | select * from Student limit 10 | -| Student | 10 | stun | 100 | | | select * from Student where stun > 100 limit 10 | -| Student | 10 | registerTime | "2022-04-21 10:23:55" | "DATETIME" | "YYYY-MM-dd HH:mm:ss" | select * from Student where registerTime > '2022-04-21 10:23:55' order by registerTime ASC limit 10 | - -### templateSqlQueryCfg - -* `TemplateSql`: sql语句模板 -* `indexField`: 表的哪一列作为索引来记录偏移量 -* `indexValue`: 同上 -* `indexFieldType`: 同上 -* `dateTimeFormat`: 同上 - -::: v-pre -| TemplateSql | indexField | indexValue | indexFieldType | dateTimeFormat | sql query statement | -| ------------------------------------------------------------------------------------------------- | ------------ | --------------------- | -------------- | --------------------- | --------------------------------------------------------------------------------------------------- | -| select * from Student limit 10 | | | | | select * from Student limit 10 | -| select * from Student where stun > {{.stun}} limit 10 | stun | 100 | | | select * from Student where stun > 100 limit 10 | -| select * from Student where registerTime > '{{.registerTime}}' order by registerTime ASC limit 10 | registerTime | "2022-04-21 10:23:55" | "DATETIME" | "YYYY-MM-dd HH:mm:ss" | select * from Student where registerTime > '2022-04-21 10:23:55' order by registerTime ASC limit 10 | -::: - -### *注意*: 用户只需要设置 internalSqlQueryCfg 或 templateSqlQueryCfg,如果两者都设置,将使用 templateSqlQueryCfg - -## 覆盖默认配置 - -如果您有需要覆盖默认设置的特定连接,您可以创建自定义部分。在前面的示例中,我们创建了一个名为 `template_config` 的特定设置。然后,您可以在创建流定义时使用选项 `配置组` 指定配置。 - -## 使用样例 - -```text -demo ( - ... - ) WITH (DATASOURCE="demo", FORMAT="JSON", CONF_KEY="template_config", TYPE="sql"); -``` - -将使用配置键 `template_config` - -## 查询表 +| mysql | mysql://username:password@127.0.0.1:3306/testdb?parseTime=true | +| sql server | sqlserver://username:password@127.0.0.1:1433/testdb | +| postgres | postgres://username:password@127.0.0.1:5432/testdb | +| oracle | oracle://username:password@127.0.0.1:1521/testdb | +| sqlite | sqlite:/tmp/test.db | + +### SQL 语句模板示例 + +- 通过单独使用`TemplateSql`配置项,获取数据库数据。 + + TemplateSql输入: + ```sql + select top 10 * from Student where id > 1010 order by id ASC + ``` + 向数据库执行的输出: + ```sql + select top 10 * from Student where id > 1010 order by id ASC + ``` + +- 通过`TemplateSql`配置项和`indexField`、`indexValue`组合使用,获取数据库数据。 + + indexField输入:`stun` + + indexValue输入:`100` + + TemplateSql输入: + ```sql + select * from Student where stun > {{.stun}} limit 10 + ``` + 向数据库执行的输出: + ```sql + select * from Student where stun > 100 limit 10 + ``` +- 通过`TemplateSql`配置项和`indexField`、`indexValue`、`indexFieldType`、`dateTimeFormat`组合使用,获取数据库数据。 + + indexField输入:`registerTime` + + indexValue输入:`2022-04-21 10:23:55` + + indexFieldType:`DATETIME` + + dateTimeFormat:`YYYY-MM-dd HH:mm:ss` + + TemplateSql输入: + ```sql + select * from Student where registerTime > '{{.registerTime}}' order by registerTime ASC limit 10 + ``` + 向数据库执行的输出: + ```sql + select * from Student where registerTime > '2022-04-21 10:23:55' order by registerTime ASC limit 10 + ``` + +## 创建扫描表 + +请参考[创建流](#创建流)部分。 + +## 创建查询表 SQL 源支持成为一个查询表。我们可以使用创建表语句来创建一个 SQL 查询表。它将与实体关系数据库绑定并按需查询。 diff --git a/zh_CN/streaming-processing/sqls/lexical_elements.md b/zh_CN/streaming-processing/sqls/lexical_elements.md index 4d0286c..3cb3aee 100644 --- a/zh_CN/streaming-processing/sqls/lexical_elements.md +++ b/zh_CN/streaming-processing/sqls/lexical_elements.md @@ -34,7 +34,7 @@ SELECT `a-b`, `hello world`, `中文Chinese` from demo SELECT, FROM, JOIN, LEFT, INNER, ON, WHERE, GROUP, ORDER, HAVING, BY, ASC, DESC, AND, OR, CASE, WHEN, THEN, ELSE, END, IN, NOT, BETWEEN, LIKE, OVER, PARTITION ``` -以下是使用名为 `from` 的流的示例,`from` 是数据处理引擎中的保留关键字。 +以下是使用名为 `from` 的流的示例,`from` 是数据处理模块中的保留关键字。 ```sql SELECT * FROM demo1 where `from`="device1" diff --git a/zh_CN/streaming-processing/sqls/overview.md b/zh_CN/streaming-processing/sqls/overview.md index ab0ee33..b1e5abc 100755 --- a/zh_CN/streaming-processing/sqls/overview.md +++ b/zh_CN/streaming-processing/sqls/overview.md @@ -1,6 +1,6 @@ # SQL 参考 -NeuronEX 数据处理引擎提供了一种类似于 SQL 的查询语言,用于对事件流执行转换和计算。 本文介绍数据处理引擎查询语言的语法、用法和最佳实践。 +NeuronEX 数据处理模块提供了一种类似于 SQL 的查询语言,用于对数据流执行转换和计算。 本文介绍数据处理模块查询语言的语法、用法和最佳实践。 - [查询语言元素](query_language_elements.md) - [词汇元素](lexical_elements.md) diff --git a/zh_CN/streaming-processing/sqls/query_language_elements.md b/zh_CN/streaming-processing/sqls/query_language_elements.md index ea9ff58..0d6e9c0 100755 --- a/zh_CN/streaming-processing/sqls/query_language_elements.md +++ b/zh_CN/streaming-processing/sqls/query_language_elements.md @@ -1,7 +1,7 @@ # 查询语言元素 -NeuronEX 数据处理引擎提供了用于构建查询的各种元素。 总结如下。 +NeuronEX 数据处理模块提供了用于构建查询的各种元素。 总结如下。 | 元素 | 总结 | |-----------------------|--------------------------------------------------------------------------------------------------------------------------------| diff --git a/zh_CN/streaming-processing/stream.md b/zh_CN/streaming-processing/stream.md index a20ac52..42b200c 100644 --- a/zh_CN/streaming-processing/stream.md +++ b/zh_CN/streaming-processing/stream.md @@ -1,6 +1,6 @@ # 流管理 -流是 NeuronEX 中数据源连接器的运行形式,用户可通过指定源类型来定义如何连接到外部资源。流的作用就像规则的触发器,每个事件都会触发规则中的计算。 +流是 NeuronEX 中数据源接入的主要运行方式,用户可通过选择数据源类型及配置参数来定义如何连接到外部资源。数据流中有数据流入时,都会触发规则中的计算。 ## 流数据源 @@ -8,13 +8,13 @@ | 名称 | 描述 | | --------------------------- | ------------------------------------------ | +| [Neuron](./neuron.md) | 从 NeuronEX 数采模块读取数据 | | [MQTT](./mqtt.md) | 从 MQTT 主题读取数据 | | [HTTP pull](./http_pull.md) | 从 HTTP 服务器中拉取数据 | | [HTTP push](./http_push.md) | 通过 HTTP 推送数据到 NeuronEX | -| [内存](./memory.md) | 从 NeuronEX 内存主题读取数据以形成规则管道 | -| [Neuron](./neuron.md) | 从NeuronEX的数采引擎模块读取数据 | +| [内存](./memory.md) | 从 NeuronEX 内存主题读取数据以形成规则流水线 | +| [SQL](./sql.md) | 从数据库中查询数据 | | [文件](./file.md) | 从文件中读取数据 | -| [SQL](./redis.md) | 从数据库中查询数据 | | [Video](./video.md) | 从视频流中查询数据 | ## 创建流 @@ -25,36 +25,52 @@ ### 流参数配置 - - **是否为带结构的流** -NeuronEX 支持带结构/无结构的流,默认为无结构。用户无需定义任何形式的 schema,主要用于弱结构化数据流,或数据结构经常变化的情况。详细说明请参考[数据结构](#数据结构)。 + NeuronEX 支持带结构/无结构的流,默认为无结构。即在 **源管理** -> **创建流** 时,`是否为带结构的流`选项不打勾。详细说明请参考[数据结构](#数据结构)。 + - 无结构的流 + + Schemaless,用户无需定义任何形式的 schema,主要用于弱结构化数据流,或数据结构经常变化的情况。 + + - 带结构的流 + + 用户在 数据源(Source) 层定义数据schema。适用于用户的数据有固定或大致固定的格式。 + + :::tip 提示 + 部分数据格式本身带有数据结构,例如 `protobuf` 格式。用户在创建源时可以定义 `流格式` 来指向模式注册表 ( Schema Registry ) 中的数据结构定义。此时,数据源的数据结构将会被模式注册表中的定义覆盖。有关模式的详细介绍,查阅 [模式](./config.md#模式) 章节。 + ::: - **流类型** -NeuronEX 支持多种流类型,具体可参考 [流数据源](#流数据源)。 + NeuronEX 支持多种流类型,具体可参考 [流数据源](#流数据源)。 - **数据源** -取决于不同的数据源类型;如果是 MQTT 源,则为 MQTT 数据源主题名;其它源请参考相关文档。 + 取决于不同的数据源类型;如果是 MQTT 源,则为 MQTT 数据源主题名;其它源请参考相关文档。 - **配置组** -定义各类型数据源的相关配置项,具体可参考相关文档,每个数据源均提供了 default 配置组,可供参考。 + 定义各类型数据源的相关配置项,具体可参考相关文档,每个数据源均提供了 `default` 配置组,可供参考。 - **流格式** - 用于定义传入的数据类型,支持 `json`、`protobuf`、`binary`、`delimited` 和 `custom`,默认为 `JSON` 。该属性是否生效取决于源的类型,某些源自身解析的时固定私有格式的数据,则该配置不起作用。以下为其中部分流格式的介绍: + 用于定义传入的数据类型,支持 `json`、`protobuf`、`binary`、`delimited` 和 `custom`,默认为 `json` 。以下为其他流格式的介绍: - delimited - 如选择 delimited 格式,还应指定分隔符来区分数据字段,如 "," + + 对于 CSV 文件数据源,需选择 `delimited` 格式,还应指定分隔符来区分数据字段,如 "`,`" - protobuf - Protobuf 是一种序列化结构数据的方式,它更加简洁高效,能够实现数据的高效序列化。这种二进制格式的消息更小,更快,同时也更简单。当流格式设置为 protobuf 时,还应配置解码时使用的模式。模式可在 数据流处理 -> 配置 -> 模式定义。有关模式的详细介绍,见 [模式](./config.md#模式)。 + Protobuf 是一种序列化结构数据的方式,当流格式设置为 `protobuf` 时,还应配置解码时使用的模式。模式可在 **数据流处理** -> **配置** -> **模式**中定义。有关模式的详细介绍,查阅 [模式](./config.md#模式) 章节。 - Binary - 对于二进制数据流,例如图像或者视频流,需要指定数据格式为 "BINARY" 。 + + 对于二进制数据流,例如图像或者视频流,需要指定数据格式为 `binary` 。 + + - custom + + `custom` 是由用户自定义的数据格式 。