@@ -77,7 +77,7 @@ maxPubConnections=4
77
77
78
78
4.3 实现步骤
79
79
80
- 首先我们定义一个sensorTemp表用于接收实时采集的温度数据,为了避免数据表占用过多内存, 我们使用enableTablePersistence函数对sensorTemp表做持久化。
80
+ 首先我们定义一个sensorTemp流数据表用于接收实时采集的温度数据, 我们使用enableTablePersistence函数对sensorTemp表做持久化,这里我们对流数据表设置100万条记录的阈值,每当记录数超出100万之后,系统会做一次持久化处理 。
81
81
```
82
82
share streamTable(1000000:0,`hardwareId`ts`temp1`temp2`temp3,[INT,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE]) as sensorTemp
83
83
enableTablePersistence(sensorTemp, true, false, 1000000)
@@ -92,25 +92,27 @@ db = database("dfs://iotDemoDB",COMPO,[db1,db2])
92
92
dfsTable = db.createPartitionedTable(tableSchema,"sensorTemp",`ts`hardwareId)
93
93
subscribeTable(, "sensorTemp", "save_to_db", -1, append!{dfsTable}, true, 1000000,10)
94
94
```
95
- > * 需要观察分布式数据,可以通过以下两种途径 1.可以通过集群管理web界面上的Dfs Explorer来观察。2. 可以通过dfsTable = database("dfs://iotDemoDB").loadTable("sensorTemp"); select top 100 * from dfsTable 来观察表内的实时记录*
96
95
97
- 在对流数据做分布式保存数据库的同时,系统使用DolphinDB内置的 createStreamAggregator 实时运算函数来定义实时运算的过程。脚本里指定了窗口大小为60秒,每2秒钟对窗口宽度内的温度数据做一次求均值运算,其中第三个参数是运算的元代码,可以由用户自己指定计算函数,任何系统支持的或用户自定义的聚合函数这里都能支持。最后通过subscribeTable订阅高频流数据,并在有新数据进来时触发实时计算,并将运算结果保存到一个新的数据流表sensorTempAvg中。
98
-
99
- > * createStreamAggregator 参数说明:窗口时间,运算间隔时间,聚合运算元代码,原始数据输入表,运算结果输出表,时序字段,分组字段,触发GC记录数阈值。*
96
+ 在对流数据做分布式保存数据库的同时,系统使用DolphinDB内置的 createStreamAggregator 实时运算函数来定义实时运算的过程。函数第一个参数指定了窗口大小为60秒,第二个参数指定每2秒钟做一次求均值运算,第三个参数是运算的元代码,可以由用户自己指定计算函数,任何系统支持的或用户自定义的聚合函数这里都能支持,通过指定分组字段hardwareId,函数会将流数据按设备分成1000个队列进行均值运算,每个设备都会按各自的窗口计算得到对应的平均温度。最后通过subscribeTable订阅高频流数据,在有新数据进来时触发实时计算,并将运算结果保存到一个新的数据流表sensorTempAvg中。
97
+ createStreamAggregator 参数说明:窗口时间,运算间隔时间,聚合运算元代码,原始数据输入表,运算结果输出表,时序字段,分组字段,触发GC记录数阈值。
100
98
101
99
```
102
100
share streamTable(1000000:0, `time`hardwareId`tempavg1`tempavg2`tempavg3, [TIMESTAMP,INT,DOUBLE,DOUBLE,DOUBLE]) as sensorTempAvg
103
101
metrics = createStreamAggregator(60000,2000,<[avg(temp1),avg(temp2),avg(temp3)]>,sensorTemp,sensorTempAvg,`ts,`hardwareId,2000)
104
102
subscribeTable(, "sensorTemp", "metric_engine", -1, append!{metrics},true)
105
103
```
106
- 在DolphinDB Server端在对高频数据流做保存、分析的时候,Grafana前端程序每秒钟会轮询实时运算的结果,并刷新平均温度的趋势图。关于Grafana的安装以及DolphinDB的接口配置请参考[ Grafana配置教程] ( https://www.github.com/dolphindb/grafana-datasource/blob/master/README.md )
104
+
105
+ 在DolphinDB Server端在对高频数据流做保存、分析的时候,Grafana前端程序每秒钟会轮询实时运算的结果,并刷新平均温度的趋势图。DolphinDB提供了Grafana_DolphinDB的datasource插件,关于Grafana的安装以及DolphinDB的插件配置请参考[ Grafana配置教程] ( https://www.github.com/dolphindb/grafana-datasource/blob/master/README.md )
107
106
。在完成grafana的基本配置之后,新增一个Graph Panel, 在Metrics tab里输入
108
107
109
108
```
110
109
select gmtime(time) as time, tempavg1, tempavg2, tempavg3 from sensorTempAvg where hardwareId = 1
111
110
```
112
111
> * 这段脚本是选出1号设备实时运算得到的平均温度表*
113
112
113
+ ![ image] ( https://github.com/dolphindb/Tutorials_CN/blob/master/images/datasource.JPG )
114
+
115
+
114
116
最后,启动数据模拟程序,生成高频数据并写入流数据表
115
117
> * 高频数据规模: 1000 个设备,以每个点3个维度、10ms的频率生成数据,以每个维度8个Byte ( Double类型 ) 计算,数据流速是 24Mbps,持续100秒。*
116
118
```
0 commit comments