1
1
# DolphinDB 流计算在物联网的应用:实时检测传感器状态变化
2
2
3
- 工业物联网领域,能否对从传感器采集到的包括湿度、温度、压力、液位、流速等多方面的海量数据进行快速的实时处理,对各种工业智能制造应用至关重要。 DolphinDB 提供了流数据表 (stream table) 和流计算引擎用于实时数据处理,助力智能制造。 [ DolphinDB 流计算引擎实现传感器数据异常检测] ( https://gitee.com/dolphindb/Tutorials_CN/blob/master/iot_anomaly_detection.md ) 一文介绍了怎么用内置的异常检测引擎 (Anomaly Detection Engine) 和自定义计算引擎实现异常检测的需求。目前 DolphinDB 内置了更多的计算引擎,本文将介绍如何用响应式状态引擎和会话窗口引擎实现传感器状态变化的实时监测。
4
-
5
- - [ DolphinDB 流计算在物联网的应用:实时检测传感器状态变化] ( #dolphindb-流计算在物联网的应用实时检测传感器状态变化 )
6
- - [1. 应用需求](#1-应用需求)
7
- - [2. 实验环境准备](#2-实验环境准备)
8
- - [3. 设计思路](#3-设计思路)
9
- - [4. 实现步骤](#4-实现步骤)
10
- - [4.1 定义输入输出流数据表](#41-定义输入输出流数据表)
11
- - [4.2 创建响应式状态引擎实现传感器状态变化实时监测](#42-创建响应式状态引擎实现传感器状态变化实时监测)
12
- - [4.3 创建会话窗口引擎实现传感器丢失数据实时报警](#43-创建会话窗口引擎实现传感器丢失数据实时报警)
13
- - [4.4 订阅流数据](#44-订阅流数据)
14
- - [4.5 从 MQTT 服务器接收数据](#45-从-mqtt-服务器接收数据)
15
- - [5. 模拟写入与验证](#5-模拟写入与验证)
16
- - [6. 总结](#6-总结)
17
- - [附录](#附录)
3
+ 工业物联网领域,能否对从传感器采集到的包括湿度、温度、压力、液位、流速等多方面的海量数据进行快速的实时处理,对各种工业智能制造应用至关重要。DolphinDB 提供了流数据表 (stream table) 和流计算引擎用于实时数据处理,助力智能制造。 [ DolphinDB 流计算引擎实现传感器数据异常检测] ( https://gitee.com/dolphindb/Tutorials_CN/blob/master/iot_anomaly_detection.md ) 一文介绍了怎么用内置的异常检测引擎 (Anomaly Detection Engine) 和自定义计算引擎实现异常检测的需求。目前 DolphinDB 内置了更多的计算引擎,本文将介绍如何用响应式状态引擎和会话窗口引擎实现传感器状态变化的实时监测。
4
+
5
+ - [ 1. 应用需求] ( #1-应用需求 )
6
+ - [ 2. 实验环境准备] ( #2-实验环境准备 )
7
+ - [ 3. 设计思路] ( #3-设计思路 )
8
+ - [ 4. 实现步骤] ( #4-实现步骤 )
9
+ - [4.1 定义输入输出流数据表](#41-定义输入输出流数据表)
10
+ - [4.2 创建响应式状态引擎实现传感器状态变化实时监测](#42-创建响应式状态引擎实现传感器状态变化实时监测)
11
+ - [4.3 创建会话窗口引擎实现传感器丢失数据实时报警](#43-创建会话窗口引擎实现传感器丢失数据实时报警)
12
+ - [4.4 订阅流数据](#44-订阅流数据)
13
+ - [4.5 从 MQTT 服务器接收数据](#45-从-mqtt-服务器接收数据)
14
+ - [ 5. 模拟写入与验证] ( #5-模拟写入与验证 )
15
+ - [ 6. 总结] ( #6-总结 )
16
+ - [ 附录] ( #附录 )
18
17
19
18
## 1. 应用需求
20
19
21
- 假定有一个监控系统,对所有传感器每5秒钟采集1次数据 ,并将采集后的数据以 json 格式写入 mqtt 服务器,典型样本数据如下所示:
20
+ 假定有一个监控系统,对所有传感器每 5 秒钟采集 1 次数据 ,并将采集后的数据以 json 格式写入 mqtt 服务器,典型样本数据如下所示:
22
21
23
22
```
24
23
tag ts value
@@ -34,10 +33,10 @@ motor.C17156G.m146 2022.11.05T15:32:15.734 0
34
33
motor.C17156B.m1 2022.11.05T15:32:17.750 1
35
34
```
36
35
37
- 其中 tag 是传感器标签,ts 是采集时间戳,value 是设备测量值,取值范围是0 ~ 4,分别表示设备的5种状态(0表示待运行,1表示运行, 2表示节能, 3表示阻塞,4表示过载 )。现有以下实时检测需求:
36
+ 其中 tag 是传感器标签,ts 是采集时间戳,value 是设备测量值,取值范围是 0 ~ 4,分别表示设备的 5 种状态(0 表示待运行,1 表示运行,2 表示节能,3 表示阻塞,4 表示过载 )。现有以下实时检测需求:
38
37
39
38
* 传感器状态变化监控:当监测到当前记录的传感器测量值与该传感器前一条记录的测量值不一样,即状态有变化时,输出这条记录,若不变就丢弃这条记录。
40
- * 传感器数据丢失告警:若传感器数据丢失,即30秒钟内某个传感器没有采集到数据 ,系统告警。报警方式为,在侦测到传感器数据采集异常后,向一个流数据表中写一条记录。
39
+ * 传感器数据丢失告警:若传感器数据丢失,即 30 秒钟内某个传感器没有采集到数据 ,系统告警。报警方式为,在侦测到传感器数据采集异常后,向一个流数据表中写一条记录。
41
40
42
41
## 2. 实验环境准备
43
42
@@ -48,11 +47,11 @@ motor.C17156B.m1 2022.11.05T15:32:17.750 1
48
47
* CPU 类型:Intel(R) Core(TM) i7-6700HQ CPU @ 2.60GHz
49
48
* 逻辑 CPU 总数:4
50
49
* 内存:32 GB
51
- * OS:64位 Ubuntu 20.04
50
+ * OS:64 位 Ubuntu 20.04
52
51
53
52
* DolphinDB server 部署
54
53
55
- * server 版本:2.00.8 Linux 64 JIT ,社区版
54
+ * server 版本:2.00.8 Linux 64 JIT,社区版
56
55
* 部署模式:单节点模式
57
56
58
57
* DolphinDB GUI:1.30.14 版本
@@ -61,7 +60,7 @@ motor.C17156B.m1 2022.11.05T15:32:17.750 1
61
60
62
61
## 3. 设计思路
63
62
64
- DolphinDB 的流计算框架目前已提供时序聚合引擎、横截面聚合引擎、异常检测引擎、会话窗口引擎和响应式状态引擎等10余种计算引擎应对不同计算场景 。本文主要介绍如何用响应式状态引擎和会话窗口引擎实现传感器状态变化的实时监测。
63
+ DolphinDB 的流计算框架目前已提供时序聚合引擎、横截面聚合引擎、异常检测引擎、会话窗口引擎和响应式状态引擎等 10 余种计算引擎应对不同计算场景 。本文主要介绍如何用响应式状态引擎和会话窗口引擎实现传感器状态变化的实时监测。
65
64
66
65
* 会话窗口引擎
67
66
@@ -73,13 +72,13 @@ DolphinDB 流数据引擎所计算的因子可分为无状态因子与有状态
73
72
74
73
对于第一个需求即传感器状态变化,可使用响应式状态引擎。响应式状态引擎可以设置过滤条件,通过过滤条件判断当前记录的状态值与前一条记录相同,只有符合过滤条件的结果才被输出。
75
74
76
- 对于第二个需求即检测是否有数据丢失,可使用会话窗口引擎。对于每个传感器,会话窗口收到该传感器某条数据之后,若在30秒内仍未收到该传感器的下一条新数据 ,则认为该窗口结束,输出报警。
75
+ 对于第二个需求即检测是否有数据丢失,可使用会话窗口引擎。对于每个传感器,会话窗口收到该传感器某条数据之后,若在 30 秒内仍未收到该传感器的下一条新数据 ,则认为该窗口结束,输出报警。
77
76
78
77
## 4. 实现步骤
79
78
80
79
### 4.1 定义输入输出流数据表
81
80
82
- 首先,定义一个流数据表用于接收实时采集的传感器数据,表结构包含三列,即标签 tag、时间 ts 和标签值 value。通过 [ enableTableShareAndPersistence] ( https://www.dolphindb.cn/cn/help/FunctionsandCommands/CommandsReferences/e/enableTableShareAndPersistence.html ) 函数共享流数据表并持久化到硬盘上。通过 cacheSize 参数将内存中可保存的最大数据量设定为10万行 。代码如下:
81
+ 首先,定义一个流数据表用于接收实时采集的传感器数据,表结构包含三列,即标签 tag、时间 ts 和标签值 value。通过 [ enableTableShareAndPersistence] ( https://www.dolphindb.cn/cn/help/FunctionsandCommands/CommandsReferences/e/enableTableShareAndPersistence.html ) 函数共享流数据表并持久化到硬盘上。通过 cacheSize 参数将内存中可保存的最大数据量设定为 10 万行 。代码如下:
83
82
84
83
```
85
84
stream01=streamTable(100000:0,`tag`ts`value,[SYMBOL,TIMESTAMP, INT])
@@ -102,7 +101,7 @@ enableTableShareAndPersistence(table=out2,tableName=`outputSt2,asynWrite=false,c
102
101
103
102
### 4.2 创建响应式状态引擎实现传感器状态变化实时监测
104
103
105
- 响应式状态引擎中,设置分组列(keyColumn)为传感器标签 tag,2个计算指标为 ` ts 和 value ` ,表示原样输出。需要注意的是 filter 参数的设置:` <value!=prev(value) && prev(value)!=NULL> ` ,这里以元代码的形式表示过滤条件。只有符合过滤条件的结果,即 value 值与该分组的上一个 value 值不相同才会被输出到通过 outputTable 设置的输出表,其中 ` prev(value)!=NULL ` 表示不输出每个分组的第一条记录。参考 DolphinDB 用户手册中 createReactiveStateEngine 页面内容完成对其他参数的设置。代码如下:
104
+ 响应式状态引擎中,设置分组列(keyColumn)为传感器标签 tag,2 个计算指标为 ` ts 和 value ` ,表示原样输出。需要注意的是 filter 参数的设置:` <value!=prev(value) && prev(value)!=NULL> ` ,这里以元代码的形式表示过滤条件。只有符合过滤条件的结果,即 value 值与该分组的上一个 value 值不相同才会被输出到通过 outputTable 设置的输出表,其中 ` prev(value)!=NULL ` 表示不输出每个分组的第一条记录。参考 DolphinDB 用户手册中 createReactiveStateEngine 页面内容完成对其他参数的设置。代码如下:
106
105
107
106
```
108
107
reactivEngine = createReactiveStateEngine(name=`reactivEngine, metrics=<[ts, value]>, dummyTable=stream01,
@@ -111,7 +110,7 @@ reactivEngine = createReactiveStateEngine(name=`reactivEngine, metrics=<[ts, val
111
110
112
111
### 4.3 创建会话窗口引擎实现传感器丢失数据实时报警
113
112
114
- 会话窗口引擎中,设置 keyColumn(分组列)为传感器标签 tag,timeColumn(时间列)为 ts 。检测需求是30秒内无数据 ,所以 sessionGap 为30000 (单位为毫秒,同 ts 列),表示收到某条数据后经过该时间的等待仍无新数据到来,就终止当前窗口。设置 useSessionStartTime 为 false ,表示输出表中的时刻为数据窗口结束时刻,即每个窗口中最后一条数据的时刻 + * sessionGap* 。参考 DolphinDB 用户手册中 [ createSessionWindowEngine] ( https://www.dolphindb.cn/cn/help/FunctionsandCommands/FunctionReferences/c/createSessionWindowEngine.html ) 页面内容完成对其他参数的设置。代码如下:
113
+ 会话窗口引擎中,设置 keyColumn(分组列)为传感器标签 tag,timeColumn(时间列)为 ts。检测需求是 30 秒内无数据 ,所以 sessionGap 为 30000 (单位为毫秒,同 ts 列),表示收到某条数据后经过该时间的等待仍无新数据到来,就终止当前窗口。设置 useSessionStartTime 为 false,表示输出表中的时刻为数据窗口结束时刻,即每个窗口中最后一条数据的时刻 + * sessionGap* 。参考 DolphinDB 用户手册中 [ createSessionWindowEngine] ( https://www.dolphindb.cn/cn/help/FunctionsandCommands/FunctionReferences/c/createSessionWindowEngine.html ) 页面内容完成对其他参数的设置。代码如下:
115
114
116
115
```
117
116
swEngine = createSessionWindowEngine(name = "swEngine", sessionGap = 30000, metrics = < last(value)>,
@@ -140,7 +139,7 @@ subscribeTable(tableName="inputSt", actionName="monitor", offset=0,
140
139
* 再次,参数 handlerNeedMsgId 必须指定为 true。
141
140
* 更详细的说明请参阅 [ 流数据教程] ( https://gitee.com/dolphindb/Tutorials_CN/blob/master/streaming_tutorial.md#43-%E5%BF%AB%E7%85%A7%E6%9C%BA%E5%88%B6 ) 第 4.3 节或用户手册中会话窗口引擎、响应式状态引擎的说明。
142
141
143
- 2 . 若设备极多,数据采集频率很高,可能需要处理大量消息。这时可在 DolphinDB 消息订阅函数 [ subscribeTable] ( https://www.dolphindb.cn/cn/help/200/FunctionsandCommands/FunctionReferences/s/subscribeTable.html ) 中指定可选参数 filter 与 hash,让多个订阅客户端并行处理消息。相关详细说明请参阅 [ 流数据教程] ( https://gitee.com/dolphindb/Tutorials_CN/blob/master/streaming_tutorial.md#42-%E5%B9%B6%E8%A1%8C%E5%A4%84%E7%90%86 ) 第4.2节或用户手册中 [ subscribeTable] ( https://www.dolphindb.cn/cn/help/200/FunctionsandCommands/FunctionReferences/s/subscribeTable.html ) 和 [ setStreamTableFilterColumn] ( https://www.dolphindb.cn/cn/help/200/FunctionsandCommands/CommandsReferences/s/setStreamTableFilterColumn.html ) 的说明。
142
+ 2 . 若设备极多,数据采集频率很高,可能需要处理大量消息。这时可在 DolphinDB 消息订阅函数 [ subscribeTable] ( https://www.dolphindb.cn/cn/help/200/FunctionsandCommands/FunctionReferences/s/subscribeTable.html ) 中指定可选参数 filter 与 hash,让多个订阅客户端并行处理消息。相关详细说明请参阅 [ 流数据教程] ( https://gitee.com/dolphindb/Tutorials_CN/blob/master/streaming_tutorial.md#42-%E5%B9%B6%E8%A1%8C%E5%A4%84%E7%90%86 ) 第 4.2 节或用户手册中 [ subscribeTable] ( https://www.dolphindb.cn/cn/help/200/FunctionsandCommands/FunctionReferences/s/subscribeTable.html ) 和 [ setStreamTableFilterColumn] ( https://www.dolphindb.cn/cn/help/200/FunctionsandCommands/CommandsReferences/s/setStreamTableFilterColumn.html ) 的说明。
144
143
145
144
### 4.5 从 MQTT 服务器接收数据
146
145
@@ -155,7 +154,7 @@ mqtt::subscribe(host, port, topic, sp, inputSt)
155
154
156
155
## 5. 模拟写入与验证
157
156
158
- 附录中的样本文件包含了46个 tag 的约18分钟的数据,共9985条记录 ,下列代码可把样本文件中的数据推送并写入 MQTT 服务器:
157
+ 附录中的样本文件包含了 46 个 tag 的约 18 分钟的数据,共 9985 条记录 ,下列代码可把样本文件中的数据推送并写入 MQTT 服务器:
159
158
160
159
```
161
160
t=loadText(getHomeDir()+"/deviceState.csv")// 加载样本文件到内存,目录需根据实际情况修改
@@ -164,7 +163,7 @@ mqtt::subscribe(host, port, topic, sp, inputSt)
164
163
submitJob("submit_pub1", "submit_p1", publishTableData{host,topic,f, batchsize,t})
165
164
```
166
165
167
- 运行后,outputSt1 产生409条记录 ,用下列代码可验证结果是否正确:
166
+ 运行后,outputSt1 产生 409 条记录 ,用下列代码可验证结果是否正确:
168
167
169
168
```
170
169
t=loadText(getHomeDir()+"/deviceState.csv")
@@ -173,15 +172,15 @@ mqtt::subscribe(host, port, topic, sp, inputSt)
173
172
assert eqObj(t1.values(),t2.values())==true
174
173
```
175
174
176
- outputSt2 产生如下所示2条记录 :
175
+ outputSt2 产生如下所示 2 条记录 :
177
176
178
177
```
179
178
ts tag lastValue
180
179
2022.11.05T15:45:37.750 motor.C17156B.m1 2
181
180
2022.11.05T15:49:52.750 motor.C17156B.m1 0
182
181
```
183
182
184
- 通过下列示例代码可发现样本中有2次丢了30秒以上的数据 :
183
+ 通过下列示例代码可发现样本中有 2 次丢了 30 秒以上的数据 :
185
184
186
185
```
187
186
t=loadText(getHomeDir()+"/deviceState.csv")
0 commit comments