diff --git a/plugins/inputs/cnosdb_subscription/README_DEV.md b/plugins/inputs/cnosdb_subscription/README_DEV.md new file mode 100644 index 0000000000000..a3b7589c04bd7 --- /dev/null +++ b/plugins/inputs/cnosdb_subscription/README_DEV.md @@ -0,0 +1,176 @@ +# Cnos-Telegraf + +> 提交 PR 至 telegraf 官方仓库时,请一定注意,不要带上本文件。 + +CnosDB-Telegraf 基于 Telegraf 进行开发,增加了一些功能与插件。 + +## 原版 Telegraf 文档 + +[README.md](../README.md) + +## 分支介绍 + +### 新增功能 + +- `feature/doc` - 文档。 +- `feature/docker` - 构建 telegraf 为 docker 镜像。 +- `feature/high_priority_channel` - 使 input 插件能够将 output 插件的处理结果返回给调用者。 +- `feature/cnosdb_subscription` - 添加 input 插件用于处理 CnosDB 订阅消息的报文。 +- `feature/opentsdb_json` - 添加 input 插件用于处理 JSON 格式的 OpenTSDB 写请求的报文。 + +### cnosdb + +## 如何更新版本 + +确保 git 记录了官方仓库的信息。 + +```shell +git remote add -t master telegraf git@github.com:influxdata/telegraf.git +``` + +更新 master 分支。 + +```shell +git fetch --all --tags --prune --jobs=10 +## 按提交时间获得 tag 列表,找到最新的 tag,假设最新的 tag 为 v1.32.2 +git tag --sort=-committerdate +## 获得最新的 tag 对应的 commit id +#git rev-list -n 1 v1.32.2 +git checkout master +git reset --hard v1.32.2 +``` + +对所有的 feature 分支执行变基。 + +```shell +git checkout feature/docker +git rebase master + +git checkout feature/opentsdb_json +git rebase master + +git checkout feature/high_priority_channel +git rebase master + +## feature/cnosdb_subscription 依赖 feature/high_priority_channel +git checkout feature/cnosdb_subscription +git rebase feature/high_priority_channel +``` + +使用 `git cherry-pick` 将所有的 `feature/` 的功能合并至 `cnosdb/` + +```shell +git checkout master +git checkout -b cnosdb/main/$(date +"%Y%m%d_%H%M%S") +git cherry-pick $(git rev-list -n 1 feature/docker) +git cherry-pick $(git rev-list -n 1 feature/high_priority_channel) +git cherry-pick $(git rev-list -n 1 feature/opentsdb_json) +git cherry-pick $(git rev-list -n 1 feature/cnosdb_subscription) +git cherry-pick $(git rev-list -n 1 feature/doc) +``` + +## Cnos-Telegraf 的改动说明 + +### Parser Plugin + +增加 Parser 插件 opentsdb_json,用于采集 OpenTSDB 的 JSON 格式的写入请求。 + +#### OpenTSDB JSON + +通过使用 Input 插件 http_listener_v2 并配置 `data_format` 为 `"opentsdb"`,将能够解析 OpenTSDB 格式的写入请求。 + +```toml +[[inputs.http_listener_v2]] +service_address = ":8080" +paths = ["/api/put"] +methods = ["POST", "PUT"] +data_format = "opentsdb_json" +``` + +### Input Plugin + +#### CnosDB 订阅 + +增加 Input 插件 cnosdb_subscription,用于搜集 CnosDB 订阅功能推送的数据,搭配 Output 插件即可实现异构数据同步。 + +```toml +[[inputs.cnosdb_subscription]] +service_address = ":8803" +``` + +- **配置介绍** + +| 参数 | 说明 | +|-----------------|------| +| service_address | 监听端口 | + +#### 通用参数 + +增加配置参数 high_priority_io,用于开启端到端模式。 + +当设置为 true 时,写入的数据将立即发送到 Output 插件,并根据 Output 插件的返回参数来决定返回值。 + +```toml +[[inputs.http_listener_v2]] +service_address = ":8080" +paths = ["/api/put"] +methods = ["POST", "PUT"] +data_format = "opentsdb" +high_priority_io = true +``` + +## 构建 + +1. [安装 Go](https://golang.org/doc/install),版本要求 >=1.22 +2. 从 Github 克隆仓库: + + ```shell + git clone https://github.com/cnosdb/cnos-telegraf.git + ``` + +3. 检出与 CnosDB 对应的版本,在仓库目录下执行 `make build` + + ```shell + cd cnos-telegraf + # 如果为了适配 cnosdb/2.4 版本 + git checkout cnosdb/2.4 + make build + ``` + +## 启动 + +执行以下指令,查看用例: + +```shell +telegraf --help +``` + +### 生成一份标准的 telegraf 配置文件 + +```shell +telegraf config > telegraf.conf +``` + +### 生成一份 telegraf 配置文件,仅包含 cpu 指标采集 & influxdb 输出两个插件 + +```shell +telegraf config --section-filter agent:inputs:outputs --input-filter cpu --output-filter influxdb +``` + +### 运行 telegraf 但是将采集指标输出到标准输出 + +```shell +telegraf --config telegraf.conf --test +``` + +### 运行 telegraf 并通过配置文件来管理加载的插件 + +```shell +telegraf --config telegraf.conf +``` + +### 运行 telegraf,仅加载 cpu & memory 指标采集,和 influxdb 输出插件 + +```shell +telegraf --config telegraf.conf --input-filter cpu:mem --output-filter influxdb +``` diff --git a/plugins/inputs/cnosdb_subscription/cnosdb/generated/kv_service/kv_service.pb.go b/plugins/inputs/cnosdb_subscription/cnosdb/generated/kv_service/kv_service.pb.go index 63995840fd434..f503f2ae1f872 100644 --- a/plugins/inputs/cnosdb_subscription/cnosdb/generated/kv_service/kv_service.pb.go +++ b/plugins/inputs/cnosdb_subscription/cnosdb/generated/kv_service/kv_service.pb.go @@ -202,9 +202,13 @@ type SubscriptionRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Precision uint32 `protobuf:"varint,1,opt,name=precision,proto3" json:"precision,omitempty"` - TableSchema []byte `protobuf:"bytes,2,opt,name=table_schema,json=tableSchema,proto3" json:"table_schema,omitempty"` - RecordData []byte `protobuf:"bytes,3,opt,name=record_data,json=recordData,proto3" json:"record_data,omitempty"` + // Deprecated: Marked as deprecated in kv_service.proto. + Tenant string `protobuf:"bytes,1,opt,name=tenant,proto3" json:"tenant,omitempty"` + // Deprecated: Marked as deprecated in kv_service.proto. + Db string `protobuf:"bytes,2,opt,name=db,proto3" json:"db,omitempty"` + TableSchema []byte `protobuf:"bytes,3,opt,name=table_schema,json=tableSchema,proto3" json:"table_schema,omitempty"` + RecordData []byte `protobuf:"bytes,4,opt,name=record_data,json=recordData,proto3" json:"record_data,omitempty"` + Precision uint32 `protobuf:"varint,5,opt,name=precision,proto3" json:"precision,omitempty"` } func (x *SubscriptionRequest) Reset() { @@ -239,11 +243,20 @@ func (*SubscriptionRequest) Descriptor() ([]byte, []int) { return file_kv_service_proto_rawDescGZIP(), []int{3} } -func (x *SubscriptionRequest) GetPrecision() uint32 { +// Deprecated: Marked as deprecated in kv_service.proto. +func (x *SubscriptionRequest) GetTenant() string { if x != nil { - return x.Precision + return x.Tenant } - return 0 + return "" +} + +// Deprecated: Marked as deprecated in kv_service.proto. +func (x *SubscriptionRequest) GetDb() string { + if x != nil { + return x.Db + } + return "" } func (x *SubscriptionRequest) GetTableSchema() []byte { @@ -260,6 +273,13 @@ func (x *SubscriptionRequest) GetRecordData() []byte { return nil } +func (x *SubscriptionRequest) GetPrecision() uint32 { + if x != nil { + return x.Precision + } + return 0 +} + // CnosDB subscription v4 message. type SubscriptionResponse struct { state protoimpl.MessageState @@ -321,30 +341,33 @@ var file_kv_service_proto_rawDesc = []byte{ 0x57, 0x72, 0x69, 0x74, 0x65, 0x50, 0x6f, 0x69, 0x6e, 0x74, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x73, 0x5f, 0x6e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0c, 0x70, 0x6f, 0x69, 0x6e, - 0x74, 0x73, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x22, 0x77, 0x0a, 0x13, 0x53, 0x75, 0x62, 0x73, - 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, - 0x1c, 0x0a, 0x09, 0x70, 0x72, 0x65, 0x63, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x0d, 0x52, 0x09, 0x70, 0x72, 0x65, 0x63, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x21, 0x0a, - 0x0c, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x0c, 0x52, 0x0b, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, - 0x12, 0x1f, 0x0a, 0x0b, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, - 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x44, 0x61, 0x74, - 0x61, 0x22, 0x16, 0x0a, 0x14, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, - 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0xc1, 0x01, 0x0a, 0x0b, 0x54, 0x53, - 0x4b, 0x56, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x54, 0x0a, 0x0b, 0x57, 0x72, 0x69, - 0x74, 0x65, 0x50, 0x6f, 0x69, 0x6e, 0x74, 0x73, 0x12, 0x1e, 0x2e, 0x6b, 0x76, 0x5f, 0x73, 0x65, - 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x50, 0x6f, 0x69, 0x6e, 0x74, - 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x6b, 0x76, 0x5f, 0x73, 0x65, - 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x50, 0x6f, 0x69, 0x6e, 0x74, - 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, - 0x5c, 0x0a, 0x11, 0x57, 0x72, 0x69, 0x74, 0x65, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, - 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1f, 0x2e, 0x6b, 0x76, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, - 0x65, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x6b, 0x76, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, + 0x74, 0x73, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x22, 0xa7, 0x01, 0x0a, 0x13, 0x53, 0x75, 0x62, + 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x12, 0x1a, 0x0a, 0x06, 0x74, 0x65, 0x6e, 0x61, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x42, 0x02, 0x18, 0x01, 0x52, 0x06, 0x74, 0x65, 0x6e, 0x61, 0x6e, 0x74, 0x12, 0x12, 0x0a, 0x02, + 0x64, 0x62, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x42, 0x02, 0x18, 0x01, 0x52, 0x02, 0x64, 0x62, + 0x12, 0x21, 0x0a, 0x0c, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0b, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x53, 0x63, 0x68, + 0x65, 0x6d, 0x61, 0x12, 0x1f, 0x0a, 0x0b, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x5f, 0x64, 0x61, + 0x74, 0x61, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, + 0x44, 0x61, 0x74, 0x61, 0x12, 0x1c, 0x0a, 0x09, 0x70, 0x72, 0x65, 0x63, 0x69, 0x73, 0x69, 0x6f, + 0x6e, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x09, 0x70, 0x72, 0x65, 0x63, 0x69, 0x73, 0x69, + 0x6f, 0x6e, 0x22, 0x16, 0x0a, 0x14, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, + 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0xc1, 0x01, 0x0a, 0x0b, 0x54, + 0x53, 0x4b, 0x56, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x54, 0x0a, 0x0b, 0x57, 0x72, + 0x69, 0x74, 0x65, 0x50, 0x6f, 0x69, 0x6e, 0x74, 0x73, 0x12, 0x1e, 0x2e, 0x6b, 0x76, 0x5f, 0x73, + 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x50, 0x6f, 0x69, 0x6e, + 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x6b, 0x76, 0x5f, 0x73, + 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x50, 0x6f, 0x69, 0x6e, + 0x74, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, + 0x12, 0x5c, 0x0a, 0x11, 0x57, 0x72, 0x69, 0x74, 0x65, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, + 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1f, 0x2e, 0x6b, 0x76, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x0e, 0x5a, - 0x0c, 0x2e, 0x3b, 0x6b, 0x76, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x62, 0x06, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x6b, 0x76, 0x5f, 0x73, 0x65, 0x72, 0x76, + 0x69, 0x63, 0x65, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x0e, + 0x5a, 0x0c, 0x2e, 0x3b, 0x6b, 0x76, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/plugins/inputs/cnosdb_subscription/cnosdb/kv_service_impl.go b/plugins/inputs/cnosdb_subscription/cnosdb/kv_service_impl.go index 72a1e9578fc79..29881e8a3cab2 100644 --- a/plugins/inputs/cnosdb_subscription/cnosdb/kv_service_impl.go +++ b/plugins/inputs/cnosdb_subscription/cnosdb/kv_service_impl.go @@ -152,6 +152,7 @@ func (s TSKVServiceServerImpl) WriteSubscription(server kv_service.TSKVService_W var tableSchema cnosdb_v4.TskvTableSchema if err := json.Unmarshal(req.TableSchema, &tableSchema); err != nil { + s.accumulator.AddError(fmt.Errorf("failed to parse TskvTableSchema: %w", err)) return server.Send(&kv_service.SubscriptionResponse{}) } diff --git a/plugins/inputs/cnosdb_subscription/cnosdb/v4/tskv_table_schema.go b/plugins/inputs/cnosdb_subscription/cnosdb/v4/tskv_table_schema.go index 32c6c51cb5c2d..43d454dff1aa7 100644 --- a/plugins/inputs/cnosdb_subscription/cnosdb/v4/tskv_table_schema.go +++ b/plugins/inputs/cnosdb_subscription/cnosdb/v4/tskv_table_schema.go @@ -1,9 +1,9 @@ package v4 -type ColumnType uint32 +type ColumnTypeCode uint32 const ( - ColumnTypeUnknown ColumnType = iota + ColumnTypeUnknown ColumnTypeCode = iota ColumnTypeTag ColumnTypeTime ColumnTypeFieldUnknown @@ -15,10 +15,10 @@ const ( ColumnTypeFieldGeometry ) -type TimeUnit uint32 +type TimeUnitCode uint32 const ( - TimeUnitUnknown TimeUnit = iota + TimeUnitUnknown TimeUnitCode = iota TimeUnitSecond TimeUnitMillisecond TimeUnitMicrosecond @@ -26,25 +26,26 @@ const ( ) type TskvTableSchema struct { - Tenant string `json:"tenant"` - Db string `json:"db"` - Name string `json:"name"` - SchemaVersion uint64 `json:"schema_version"` - NextColumnID uint32 `json:"next_column_id"` - Columns []TableColumn `json:"columns"` - ColumnsIndex map[string]uint32 `json:"columns_index"` + // Tenant string `json:"tenant"` + // Db string `json:"db"` + Name string `json:"name"` + // SchemaId uint64 `json:"schema_id"` // v2.4.0 field + // SchemaVersion uint64 `json:"schema_version"` // v2.4.1 field + // NextColumnID uint32 `json:"next_column_id"` + Columns []TableColumn `json:"columns"` + // ColumnsIndex map[string]uint32 `json:"columns_index"` } type TableColumn struct { ID uint64 `json:"id"` Name string `json:"name"` ColumnType interface{} `json:"column_type"` - Encoding interface{} `json:"encoding"` + // Encoding interface{} `json:"encoding"` } type ColumnTypeUnited struct { - ColumnType ColumnType - TimeUnit TimeUnit + ColumnType ColumnTypeCode + TimeUnit TimeUnitCode } func (c *TableColumn) GetColumnTypeUnited() ColumnTypeUnited { @@ -56,6 +57,54 @@ func (c *TableColumn) GetColumnTypeUnited() ColumnTypeUnited { ColumnType: ColumnTypeTag, TimeUnit: TimeUnitUnknown, } + } else { + // In cnosdb-v2.4.0, columnType is string + // After cnosdb-v2.4.0, columnType is string or object + columnTypeCode := ColumnTypeUnknown + timeUnitCode := TimeUnitUnknown + switch columnType { + case "TAG_STRING": + // "column_type": "TAG_STRING" + return ColumnTypeUnited{ + ColumnType: ColumnTypeTag, + TimeUnit: TimeUnitUnknown, + } + case "FIELD_STRING": + // "column_type": "FIELD_STRING" + columnTypeCode = ColumnTypeFieldString + case "FIELD_BIGINT": + // "column_type": "FIELD_BIGINT"" + columnTypeCode = ColumnTypeFieldInteger + case "FIELD_BIGINT UNSIGNED": + // "column_type": "FIELD_BIGINT UNSIGNED"" + columnTypeCode = ColumnTypeFieldUnsigned + case "FIELD_DOUBLE": + // "column_type": "FIELD_STRING" + columnTypeCode = ColumnTypeFieldFloat + case "FIELD_BOOLEAN": + // "column_type": "FIELD_BOOLEAN"" + columnTypeCode = ColumnTypeFieldBoolean + case "TIME_TIMESTAMP(SECOND)": + // "column_type": "TIME_TIMESTAMP(SECOND)"" + columnTypeCode = ColumnTypeTime + timeUnitCode = TimeUnitSecond + case "TIME_TIMESTAMP(MILLISECOND)": + // "column_type": "TIME_TIMESTAMP(MILLISECOND)"" + columnTypeCode = ColumnTypeTime + timeUnitCode = TimeUnitMillisecond + case "TIME_TIMESTAMP(MICROSECOND)": + // "column_type": "TIME_TIMESTAMP(MICROSECOND)"" + columnTypeCode = ColumnTypeTime + timeUnitCode = TimeUnitMicrosecond + case "TIME_TIMESTAMP(NANOSECOND)": + // "column_type": "TIME_TIMESTAMP(NANOSECOND)"" + columnTypeCode = ColumnTypeTime + timeUnitCode = TimeUnitNanosecond + } + return ColumnTypeUnited{ + ColumnType: columnTypeCode, + TimeUnit: timeUnitCode, + } } case map[string]interface{}: if timeUnitObj := columnType["Time"]; timeUnitObj != nil { diff --git a/plugins/inputs/cnosdb_subscription/protocol/kv_service.proto b/plugins/inputs/cnosdb_subscription/protocol/kv_service.proto index 4f2dc3213cf6a..1d1a2a31a332d 100644 --- a/plugins/inputs/cnosdb_subscription/protocol/kv_service.proto +++ b/plugins/inputs/cnosdb_subscription/protocol/kv_service.proto @@ -24,9 +24,11 @@ message WritePointsResponse { // CnosDB subscription v4 message. message SubscriptionRequest { - uint32 precision = 1; - bytes table_schema = 2; - bytes record_data = 3; + string tenant = 1; + string db = 2; + bytes table_schema = 3; + bytes record_data = 4; + uint32 precision = 5; } // CnosDB subscription v4 message.