Skip to content

Commit

Permalink
Merge pull request #40 from QQDQ/main
Browse files Browse the repository at this point in the history
update data processing source and rule
  • Loading branch information
QQDQ authored Dec 31, 2023
2 parents a4cab38 + 9940162 commit 1b9a413
Show file tree
Hide file tree
Showing 25 changed files with 207 additions and 141 deletions.
Binary file added zh_CN/streaming-processing/_assets/lookup1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added zh_CN/streaming-processing/_assets/lookup2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added zh_CN/streaming-processing/_assets/lookup3.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added zh_CN/streaming-processing/_assets/scan1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added zh_CN/streaming-processing/_assets/scan2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified zh_CN/streaming-processing/_assets/stream.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added zh_CN/streaming-processing/_assets/stream1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added zh_CN/streaming-processing/_assets/stream2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added zh_CN/streaming-processing/_assets/stream3.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
29 changes: 9 additions & 20 deletions zh_CN/streaming-processing/file.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,15 @@ id name age

该文件第一行为文件头,定义了文件的列名。读取这样的文件时,配置文件如下,需要指定文件有一个头。

```yaml
csv:
fileType: csv
hasHeader: true
```
在流定义中,将流数据设置为 `DELIMITED` 格式,用 `DELIMITER` 参数指定分隔符为空格。

```SQL
create
stream cscFileDemo () WITH (FORMAT="DELIMITED", DATASOURCE="abc.csv", TYPE="file", DELIMITER=" ", CONF_KEY="csv"
```
![source](./_assets/source_file3.png)

通过以上命令,我们创建了一个名为 file1 的流,该流将从 abc.csv 文件中读取数据,预期分隔符为空格。

::: tip 注意
在分隔符配置项中,需要输入空格。
:::

### 读取多行 JSON 数据

Expand All @@ -108,17 +105,9 @@ stream cscFileDemo () WITH (FORMAT="DELIMITED", DATASOURCE="abc.csv", TYPE="file
{"id": 3, "name": "John Smith"}
```

读取这种格式的文件时,配置中的文件类型设置为 `lines`。

```yaml
jsonlines:
fileType: lines
```
读取这种格式的文件时,配置中的文件类型设置为 `lines`,流格式设置为`JSON`格式。

在流定义中,设置流数据为`JSON`格式。
![source](./_assets/source_file2.png)

```SQL
create stream linesFileDemo () WITH (FORMAT="JSON", TYPE="file", CONF_KEY="jsonlines"
```

此外,lines 文件类型可以与任何格式相结合。例如,如果你将格式设置为 protobuf,并且配置模式,它可以用来解析包含多个 Protobuf 编码行的数据。
133 changes: 76 additions & 57 deletions zh_CN/streaming-processing/lookup.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,81 @@
# 查询表

查询表(Lookup Table)用于绑定外部静态数据,可以处理大量的批数据连接的需求。本教程以两个场景为例,介绍了如何使用查询表进行流批结合的计算。我们分别使用了 Redis 和 MySQL 作为外部查询表的类型并展示了如何通过规则动态更新外部存储的数据。用户可以使用查询表工具探索更多的流批结合运算的场景。
查询表(Lookup Table)用于绑定外部静态数据,可以处理大量的批数据连接的需求。本教程以两个场景为例,介绍了如何使用查询表进行流批结合的计算。我们分别使用了 MySQL 和 Redis 作为外部查询表的类型并展示了如何通过规则动态更新外部存储的数据。用户可以使用查询表工具探索更多的流批结合运算的场景。

## 数据补全场景

流数据变化频繁,数据量大,通常只包含需要经常变化的数据;而不变或者变化较少的数据通常存储于数据库等外部存储中。在应用处理时,通常需要将流数据中缺少的静态数据补全。例如,流数据中包含了设备的 ID,但设备的具体名称,型号的描述数据存储于数据库中。本场景中,我们将介绍如何将流数据与批数据结合,进行自动数据补全。

### MySQL 数据库配置

本场景将以 MySQL 为例,介绍如何与关系数据库进行连接。用户需要启动 MySQL 实例。在 MySQL 中创建表 `devices`, 其中包含 `id`, `name`, `deviceKind` 字段。
```sql
CREATE TABLE devices (
id INT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
deviceKind VARCHAR(255) NOT NULL
);
```

提前写入内容,如下:
```sql
INSERT INTO devices (id, name, deviceKind) VALUES
(1, 'Device1', 'Kind1'),
(2, 'Device2', 'Kind2'),
(3, 'Device3', 'Kind1'),
(4, 'Device4', 'Kind4');
```
在该示例中,每一类设备对应的名字,型号等设备信息数据,存储于 MySQL 中。


### 创建查询表

**源管理**->**查询表**页面,创建 SQL source 配置,指向创建的 MySQL 实例。由于 SQL 数据库 IO 延迟较大,用户可配置是否启用查询缓存及缓存过期时间等。

其中数据库地址中,需要填入正确的数据库地址,包括IP、端口、数据库名称、用户名、密码等。

数据源选项框中,需要填入正确的表名`devices`

![lookup](./_assets/lookup1.png)

### 创建数据流

创建了一个数据流 demoStream2 来从 MQTT Broker 的主题 `scene2/data` 中读取 json 数据。这个流实时接收数据流。
![scan](./_assets/lookup2.png)

### 创建数据补全规则

流和表都创建完成后,我们就可以创建补全规则了。

```sql
SELECT * FROM demoStream2 INNER JOIN deviceTable ON demoStream.deviceId = deviceTable.id

```

![scan](./_assets/lookup3.png)

在这个规则中,通过流数据中的 deviceId 字段与设备数据库中的 id 进行匹配连接,并输出完整的数据。用户可以根据需要,在 `select` 语句中选择所需的字段。

通过向 MQTT Broker的主题 `scene2/data`发送json数据,当`demo`数据流中的数据输入,如下:
```json
{
"deviceId": 1,
"value": 2
}

```
则经过该规则处理后的输入,如下:
```json
{
"deviceId": 1,
"name": "device1",
"deviceKind": "Kind1",
"value": 2
}

```



## 动态预警场景

Expand Down Expand Up @@ -94,59 +169,3 @@
{"device":"device22","deviceKind":3,"value":54}
{"device":"device2","deviceKind":1,"value":54}
```

## 数据补全场景

流数据变化频繁,数据量大,通常只包含需要经常变化的数据;而不变或者变化较少的数据通常存储于数据库等外部存储中。在应用处理时,通常需要将流数据中缺少的静态数据补全。例如,流数据中包含了设备的 ID,但设备的具体名称,型号的描述数据存储于数据库中。本场景中,我们将介绍如何将流数据与批数据结合,进行自动数据补全。

### SQL 插件配置


本场景将以 MySQL 为例,介绍如何与关系数据库进行连接。用户需要启动 MySQL 实例。在 MySQL 中创建表 `devices`, 其中包含 `id`, `name`, `deviceKind` 等字段并提前写入内容。

在管理控制台中,创建 SQL source 配置,指向创建的 MySQL 实例。由于 SQL 数据库 IO 延迟较大,用户可配置是否启用查询缓存及缓存过期时间等。

```yaml
lookup:
cache: true # 启用缓存
cacheTtl: 600 # 缓存过期时间
cacheMissingKey: true # 是否缓存未命中的情况
```
### 场景输入
本场景中,我们有两个输入:
- 采集数据流,与场景 1 相同,包含多种设备的实时采集数据。本教程中,采集的数据流通过 MQTT 协议进行实时发送。
- 设备信息表,每一类设备对应的名字,型号等元数据。本教程中,设备信息数据存储于 MySQL 中。
针对这两种输入,我们分别创建流和查询表进行建模。
1. 创建数据流。假设数据流写入 MQTT Topic `scene2/data` 中,则我们可通过以下 REST API 创建名为 `demoStream2` 的数据流。
```json
{"sql":"CREATE STREAM demoStream2() WITH (DATASOURCE=\"scene2/data\", FORMAT=\"json\", TYPE=\"mqtt\")"}
```
2. 创建查询表。假设设备数据存储于 MySQL 数据库 devices 中,创建名为 `deviceTable` 的查询表。CONF_KEY 设置为上一节中创建的 SQL source 配置。
```json
{"sql":"CREATE TABLE deviceTable() WITH (DATASOURCE=\"devices\", CONF_KEY=\"mysql\",TYPE=\"sql\", KIND=\"lookup\")"}
```

### 数据补全规则

流和表都创建完成后,我们就可以创建补全规则了。

```json
{
"id": "ruleLookup",
"sql": "SELECT * FROM demoStream2 INNER JOIN deviceTable ON demoStream.deviceId = deviceTable.id",
"actions": [{
"mqtt": {
"server": "tcp://myhost:1883",
"topic": "rule/lookup",
"sendSingle": true
}
}]
}
```

在这个规则中,通过流数据中的 deviceId 字段与设备数据库中的 id 进行匹配连接,并输出完整的数据。用户可以根据需要,在 `select` 语句中选择所需的字段。
109 changes: 86 additions & 23 deletions zh_CN/streaming-processing/scan.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,45 +4,108 @@

## 数据补全

表的典型用法是作为查找表。示例 SQL 将类似于:
```sql
CREATE TABLE table1 (
id BIGINT,
name STRING
) WITH (DATASOURCE="lookup.json", FORMAT="JSON", TYPE="file");
在这个例子中,创建了一个表 `table1` 来从文件 **lookup.json** 中读取 json 数据。

SELECT * FROM demo INNER JOIN table1 on demo.id = table1.id
```
![scan](./_assets/scan1.png)

在这个例子中,创建了一个表 `table1` 来从文件 *lookup.json* 中读取 json 数据。然后在规则中,将 `table1` 与流 `demo` 连接起来,以便流可以从 id 中查找名称。

*lookup.json* 文件的内容应该是一个对象数组。下面是一个例子:
**lookup.json** 文件的内容是一个对象数组。并放置在了`/opt/neuronex/lookup.json`目录下
```json
[
{
"id": 1541152486013,
"name": "name1"
"id": 1,
"name": "device1"
},
{
"id": 1541152487632,
"name": "name2"
"id": 2,
"name": "device2"
},
{
"id": 1541152489252,
"name": "name3"
"id": 3,
"name": "device3"
}
]
```

然后创建一个规则,在规则中,将 `table1` 与流 `demo` 连接起来,以便流可以从 id 中查找名称。
```sql
SELECT * FROM demo INNER JOIN table1 on demo.id = table1.id
```
![scan](./_assets/scan2.png)

`demo`数据流中的数据输入,如下:
```json
{
"id": 1,
"value": 78
}

```

则经过该规则处理后的输入,如下:
```json
{
"id": 1,
"name": "device1",
"value": 78
}

```

## 按历史状态过滤

在某些情况下,我们可能有一个用于数据的事件流和另一个作为控制信息的事件流。
```sql
CREATE TABLE stateTable (
id BIGINT,
triggered bool
) WITH (DATASOURCE="myTopic", FORMAT="JSON", TYPE="mqtt");
在这个例子中,创建了一个表 stateTable 来从 MQTT Broker 的主题 `table1230` 中读取 json 数据。这个表存储作为控制信息的数据。
![scan](./_assets/scan_control1.png)

创建了一个流 demo 来从 MQTT Broker 的主题 `topic1230` 中读取 json 数据。这个流实时接收数据流。
![scan](./_assets/scan_control2.png)


然后创建一个规则,在规则中,将 `stateTable` 与流 `demo` 连接起来,以下 SQL 表示,将根据`stateTable`表中存储的`id``triggered`字段,来过滤`demo`流中的数据,只有当`triggered``true`时,才会将数据流输出。
```sql
SELECT * FROM demo LEFT JOIN stateTable on demo.id = stateTable.id WHERE triggered=true
```
在此示例中,创建了一个表 `stateTable` 来记录来自 mqtt 主题 *myTopic* 的触发器状态。在规则中,会根据当前触发状态来过滤 `demo` 流的数据。

![scan](./_assets/scan_control3.png)

通过向 MQTT Broker的主题 `table1230`发送json数据,分两次向`stateTable`表中存入数据,如下:
```json
{
"id": 1,
"triggered": true
}
```
```json
{
"id": 2,
"triggered": false
}
```

通过向 MQTT Broker的主题 `topic1230`发送json数据,当`demo`数据流中的数据输入,如下:
```json
{
"id": 1,
"value": 78
}

```
则经过该规则处理后的输入,如下:
```json
{
"id": 1,
"triggered": true,
"value": 78
}

```

`demo`数据流中的数据输入,如下:
```json
{
"id": 2,
"value": 78
}

```
则经过该规则处理后,由于`stateTable`表中的`id`为2时,`triggered``false`,所以不会输出数据。
2 changes: 1 addition & 1 deletion zh_CN/streaming-processing/simulator.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@ NeuronEX 数据处理模块通过 `Simulator` 类型的数据源,可以生成
- **流格式**:选择 json 格式。
- **共享**:勾选确认是否共享源。


![source](./_assets/source_simulator.png)
Loading

0 comments on commit 1b9a413

Please sign in to comment.