Skip to content

Commit

Permalink
Merge branch 'main' into gt-0ec7b8c1-7889-4a87-a5ba-3fcaa15afebc
Browse files Browse the repository at this point in the history
  • Loading branch information
Chasen-Zhang authored Jan 25, 2025
2 parents bef879f + cfa2c70 commit b967a81
Show file tree
Hide file tree
Showing 2 changed files with 272 additions and 53 deletions.
109 changes: 56 additions & 53 deletions docs/cn/guides/40-load-data/05-continuous-data-pipelines/01-stream.md
Original file line number Diff line number Diff line change
@@ -1,40 +1,40 @@
---
title: 通过流跟踪和转换数据
sidebar_label:
title: 通过Stream跟踪和转换数据
sidebar_label: Stream
---

import StepsWrap from '@site/src/components/StepsWrap';
import StepContent from '@site/src/components/Steps/step-content';

在 Databend 中,流是对表更改的动态和实时表示。流被创建以捕获和跟踪对关联表的修改,允许在数据更改发生时持续消费和分析数据变化
Databend中的Stream是表变更的动态实时表示。创建Stream是为了捕获和跟踪关联表的修改,允许在数据变更发生时持续消费和分析这些变更

### 流的工作原理
### Stream的工作原理

流可以以两种模式运行**标准****仅追加**在创建流时使用 `APPEND_ONLY` 参数(默认为 `false`)指定模式。
Stream可以以两种模式运行**标准****仅追加**[CREATE STREAM](/sql/sql-commands/ddl/stream/create-stream)时,使用`APPEND_ONLY`参数(默认为`true`)指定模式。

- **标准**捕获所有类型的数据更改,包括插入、更新和删除。
- **仅追加**:在此模式下,流仅包含数据插入记录;数据更新或删除不会被捕获
- **标准**捕获所有类型的数据变更,包括插入、更新和删除。
- **仅追加**:在此模式下,Stream仅包含数据插入记录;不捕获数据更新或删除

Databend 流的设计理念是专注于捕获数据的最终状态。例如,如果您插入一个值然后多次更新它,流只会在被消费之前保留该值的最新状态。以下示例说明了流的样子以及它在两种模式下的工作方式
Databend Stream的设计理念是专注于捕获数据的最终状态。例如,如果你插入一个值然后多次更新它,Stream只保留该值在被消费之前的最新状态。以下示例展示了Stream在两种模式下的外观和工作方式

<StepsWrap>
<StepContent number="1">

#### 创建流以捕获更改
#### 创建Stream以捕获变更

首先创建两个表,然后为每个表创建一个流,使用不同的模式来捕获表的更改
首先创建两个表,然后为每个表创建一个不同模式的Stream,以捕获表的变更

```sql
-- 创建一个表并插入一个值
CREATE TABLE t_standard(a INT);
CREATE TABLE t_append_only(a INT);

-- 创建两个不同模式的流:标准和仅追加
-- 创建两个不同模式的Stream:标准和仅追加
CREATE STREAM s_standard ON TABLE t_standard APPEND_ONLY=false;
CREATE STREAM s_append_only ON TABLE t_append_only APPEND_ONLY=true;
```

您可以使用 [SHOW FULL STREAMS](/sql/sql-commands/ddl/stream/show-streams) 命令查看创建的流及其模式
你可以使用[SHOW FULL STREAMS](/sql/sql-commands/ddl/stream/show-streams)命令查看创建的Stream及其模式

```sql
SHOW FULL STREAMS;
Expand All @@ -47,7 +47,7 @@ SHOW FULL STREAMS;
└─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
```

现在,向每个表插入两个值,并观察流捕获的内容
现在,让我们在每个表中插入两个值,并观察Stream捕获的内容

```sql
-- 插入两个新值
Expand All @@ -73,7 +73,7 @@ SELECT * FROM s_append_only;
└─────────────────────────────────────────────────────────────────────────────────────────────┘
```

上述结果表明,两个流都成功捕获了新的插入。有关结果中流列的详细信息,请参见 [流列](#stream-columns)。现在,让我们更新然后删除一个新插入的值,并检查流捕获的内容是否有差异
上述结果表明,两个Stream都成功捕获了新的插入。有关结果中Stream列的详细信息,请参见[Stream列](#stream-columns)。现在,让我们更新然后删除一个新插入的值,并检查Stream捕获的内容是否有差异

```sql
UPDATE t_standard SET a = 4 WHERE a = 2;
Expand Down Expand Up @@ -107,7 +107,7 @@ SELECT * FROM s_standard;
│ a │ change$action │ change$row_id │ change$is_update │
│ Nullable(Int32) │ Nullable(String) │ Nullable(String) │ Boolean
├─────────────────┼──────────────────┼────────────────────────────────────────┼──────────────────┤
3 │ INSERT │ 1dd5cab0b1b64328a112db89d602ca000001 │ false │
3 │ INSERT │ 1dd5cab0b1b64328a112db89d602ca04000001 │ false │
└────────────────────────────────────────────────────────────────────────────────────────────────┘

SELECT * FROM s_append_only;
Expand All @@ -119,14 +119,14 @@ SELECT * FROM s_append_only;
└─────────────────────────────────────────────────────────────────────────────────────────────┘
```

到目前为止,我们还没有注意到两种模式之间的显著差异,因为我们还没有处理流。所有更改都已合并并表现为 INSERT 操作**流可以通过任务、DML(数据操作语言)操作或带有 [WITH CONSUME](/sql/sql-commands/query-syntax/with-consume)[WITH Stream Hints](/sql/sql-commands/query-syntax/with-stream-hints) 的查询来消费**。消费后,流不包含数据,但可以继续捕获新的更改(如果有)。为了进一步分析差异,让我们继续消费流并检查输出
到目前为止,我们还没有注意到两种模式之间的显著差异,因为我们还没有处理Stream。所有变更都已合并并表现为INSERT操作**Stream可以通过任务、DML(数据操作语言)操作或带有[WITH CONSUME](/sql/sql-commands/query-syntax/with-consume)[WITH Stream Hints](/sql/sql-commands/query-syntax/with-stream-hints)的查询来消费**。消费后,Stream不包含数据,但可以继续捕获新的变更(如果有)。为了进一步分析差异,让我们继续消费Stream并检查输出

</StepContent>
<StepContent number="2">

#### 消费流
#### 消费Stream

让我们创建两个新表,并将流捕获的内容插入其中
让我们创建两个新表,并将Stream捕获的内容插入其中

```sql
CREATE TABLE t_consume_standard(b INT);
Expand All @@ -152,7 +152,7 @@ SELECT * FROM t_consume_append_only;
└─────────────────┘
```

如果您现在查询流,您会发现它们是空的,因为它们已经被消费了。
如果你现在查询Stream,你会发现它们是空的,因为它们已经被消费了。

```sql
-- 空结果
Expand All @@ -165,9 +165,9 @@ SELECT * FROM s_append_only;
</StepContent>
<StepContent number="3">

#### 捕获新更改
#### 捕获新变更

现在,让我们将每个表中的值从 `3` 更新为 `4`然后再次检查它们的流
现在,让我们将每个表中的值从`3`更新为`4`然后再次检查它们的Stream

```sql
UPDATE t_standard SET a = 4 WHERE a = 3;
Expand All @@ -188,11 +188,11 @@ SELECT * FROM s_standard;
SELECT * FROM s_append_only;
```

上述结果显示,标准流将 UPDATE 操作处理为两个动作的组合:一个 DELETE 动作删除旧值`3`和一个 INSERT 动作添加新值`4`)。当将 `3` 更新为 `4` 时,现有值 `3` 必须首先被删除,因为它不再存在于最终状态中,然后插入新值 `4`这种行为反映了标准流如何仅捕获最终更改,将更新表示为同一行的删除(移除旧值)和插入(添加新值)的序列。
上述结果表明,标准Stream将UPDATE操作处理为两个动作的组合:一个DELETE动作删除旧值`3`,一个INSERT动作添加新值`4`)。当将`3`更新为`4`时,必须首先删除现有值`3`,因为它不再存在于最终状态中,然后插入新值`4`这种行为反映了标准Stream如何仅捕获最终变更,将更新表示为同一行的删除(删除旧值)和插入(添加新值)的序列。

另一方面,仅追加流没有捕获任何内容,因为它被设计为仅记录新数据添加(INSERT)并忽略更新或删除
另一方面,仅追加Stream没有捕获任何内容,因为它设计为仅记录新数据添加(INSERT),忽略更新或删除

如果我们现在删除值 `4`,我们可以得到以下结果:
如果我们现在删除值`4`,我们可以得到以下结果:

```sql
DELETE FROM t_standard WHERE a = 4;
Expand All @@ -211,7 +211,10 @@ SELECT * FROM s_standard;
SELECT * FROM s_append_only;
```

我们可以看到,两种流模式都能够捕获插入,以及在流被消费之前对插入值进行的任何后续更新和删除。然而,在消费之后,如果对之前插入的数据进行更新或删除,只有标准流能够捕获这些更改,并将它们记录为 DELETE 和 INSERT 操作。
我们可以看到,两种Stream模式都能够捕获插入,以及在Stream被消费之前对插入值的任何后续更新和删除。然而,消费后,如果对先前插入的数据进行更新或删除,只有标准Stream能够捕获这些变更,并将其记录为DELETE和INSERT操作。

</StepContent>
</StepsWrap>

</StepContent>
</StepsWrap>
Expand All @@ -220,27 +223,27 @@ SELECT * FROM s_append_only;

在 Databend 中,流消费在单语句事务中是事务性的。这意味着:

**成功的事务**如果事务提交成功,流将被消费。例如:
**成功的事务**如果事务提交,流将被消费。例如:

```sql
INSERT INTO table SELECT * FROM stream;
```

如果这个 `INSERT` 事务提交成功,流将被消费。
如果这个 `INSERT` 事务提交,流将被消费。

**失败的事务**:如果事务失败,流将保持不变,并可用于未来的消费
**失败的事务**:如果事务失败,流将保持不变,并可供未来消费

**并发访问**_同一时间只能有一个事务成功消费一个流_。如果有多个事务尝试消费同一个流,只有第一个提交的事务会成功,其他事务将失败。
**并发访问**_同一时间只有一个事务可以成功消费一个流_。如果多个事务尝试消费同一个流,只有第一个提交的事务会成功,其他事务将失败。

### 流的表元数据

**流不存储表的任何数据**。在为表创建流后,Databend 会为表引入特定的隐藏元数据列,用于变更跟踪。这些列包括:
**流不会存储表的任何数据**。在为表创建流后,Databend 会向表中引入特定的隐藏元数据列,用于变更跟踪。这些列包括:

| | 描述 |
|| 描述 |
| ---------------------- | --------------------------------------------------------------------------------- |
| \_origin_version | 标识此行最初创建时的表版本|
| \_origin_block_id | 标识此行之前所属的块 ID。 |
| \_origin_block_row_num | 标识此行之前所属块中的行号|
| \_origin_version | 标识最初创建此行的表版本|
| \_origin_block_id | 标识此行先前所属的块 ID。 |
| \_origin_block_row_num | 标识此行先前所属块中的行号|
| \_row_version | 标识行版本,从 0 开始,每次更新递增 1。 |

要显示这些列的值,请使用 SELECT 语句:
Expand Down Expand Up @@ -286,13 +289,13 @@ FROM

### 流列

您可以使用 SELECT 语句直接查询流并检索跟踪的变更。在查询流时,考虑包含这些隐藏列以获取有关变更的更多详细信息
您可以使用 SELECT 语句直接查询流并获取跟踪的变更。在查询流时,可以考虑包含这些隐藏列以获取有关变更的更多详细信息

| | 描述 |
|| 描述 |
| ---------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| change$action | 变更类型:INSERT 或 DELETE。 |
| change$is_update | 指示 `change$action` 是否是 UPDATE 的一部分。在流中,UPDATE 由 DELETE 和 INSERT 操作的组合表示,此字段设置为 `true`|
| change$row_id | 用于跟踪变更的每一行的唯一标识符|
| change$is_update | 指示 `change$action` 是否为 UPDATE 的一部分。在流中,UPDATE 由 DELETE 和 INSERT 操作的组合表示,此字段设置为 `true`|
| change$row_id | 每行的唯一标识符,用于跟踪变更|

```sql title='示例:'
CREATE TABLE t(a int);
Expand Down Expand Up @@ -322,17 +325,17 @@ SELECT * FROM s;

### 示例:实时跟踪和转换数据

以下示例演示了如何使用流实时捕获和跟踪用户活动
以下示例演示了如何使用流来实时捕获和跟踪用户活动

#### 1. 创建表

示例使用三个表
该示例使用了三个表

- `user_activities` 表记录用户活动。
- `user_profiles` 表存储用户配置文件
- `user_activity_profiles` 表是两个表的组合视图
- `user_profiles` 表存储用户资料
- `user_activity_profiles` 表是这两个表的组合视图

`activities_stream` 表作为流创建,以捕获 `user_activities` 表的实时变更。然后,流被一个查询消费,以使用最新数据更新 `user_activity_profiles` 表。
`activities_stream` 表被创建为一个流,用于捕获 `user_activities` 表的实时变更。然后通过查询消费该流,以使用最新数据更新 `user_activity_profiles` 表。

```sql
-- 创建一个表来记录用户活动
Expand All @@ -342,7 +345,7 @@ CREATE TABLE user_activities (
timestamp TIMESTAMP
);

-- 创建一个表来存储用户配置文件
-- 创建一个表来存储用户资料
CREATE TABLE user_profiles (
user_id INT,
username VARCHAR,
Expand All @@ -355,7 +358,7 @@ INSERT INTO user_profiles VALUES (102, 'Bob', 'San Francisco');
INSERT INTO user_profiles VALUES (103, 'Charlie', 'Los Angeles');
INSERT INTO user_profiles VALUES (104, 'Dana', 'Chicago');

-- 创建一个表来存储用户活动和配置文件的组合视图
-- 创建一个表用于用户活动和资料的组合视图
CREATE TABLE user_activity_profiles (
user_id INT,
username VARCHAR,
Expand All @@ -367,7 +370,7 @@ CREATE TABLE user_activity_profiles (

#### 2. 创建流

`user_activities` 表上创建流以捕获实时变更
`user_activities` 表上创建一个流以捕获实时变更

```sql
CREATE STREAM activities_stream ON TABLE user_activities;
Expand Down Expand Up @@ -398,16 +401,16 @@ FROM
-- 变更数据的源表
activities_stream AS a
JOIN
-- 与用户配置文件数据连接
-- 与用户资料数据连接
user_profiles AS p
ON
a.user_id = p.user_id

-- a.change$action 是一个指示变更类型(Databend 目前仅支持 INSERT)的列
-- a.change$action 是一个指示变更类型的列(目前 Databend 仅支持 INSERT)
WHERE a.change$action = 'INSERT';
```

然后,检查更新的 `user_activity_profiles` 表:
然后,检查更新后的 `user_activity_profiles` 表:

```sql
SELECT
Expand All @@ -426,11 +429,11 @@ FROM
└────────────────────────────────────────────────────────────────────────────────────────────────┘
```

#### 5. 实时数据处理的任务更新
#### 5. 实时数据处理的定时任务更新

为了保持 `user_activity_profiles` 表的最新状态,重要的是定期将其与 `activities_stream` 中的数据同步。此同步应与 `user_activities` 表的更新间隔一致,确保 `user_activity_profiles` 准确反映最新的用户活动和配置文件,以便进行实时数据分析。
为了保持 `user_activity_profiles` 表的实时性,重要的是定期将其与 `activities_stream` 中的数据同步。此同步应与 `user_activities` 表的更新间隔保持一致,以确保 `user_activity_profiles` 准确反映最新的用户活动和资料,以便进行实时数据分析。

Databend 的 `TASK` 命令(目前处于私有预览阶段)可用于定义一个每分钟或每秒更新 `user_activity_profiles` 表的任务
Databend 的 `TASK` 命令(目前处于私有预览阶段)可用于定义一个任务,每分钟或每秒更新 `user_activity_profiles`

```sql
-- 在 Databend 中定义一个任务
Expand All @@ -442,7 +445,7 @@ WHEN stream_status('activities_stream') AS
-- 向 user_activity_profiles 插入新记录
INSERT INTO user_activity_profiles
SELECT
-- 基于 user_id activities_stream user_profiles 连接
-- 基于 user_id 连接 activities_stream user_profiles
a.user_id, p.username, p.location, a.activity, a.timestamp
FROM
activities_stream AS a
Expand Down
Loading

0 comments on commit b967a81

Please sign in to comment.