教程:使用消息集成从虚拟的第三方系统中集成设备历史数据¶
你可以使用消息集成,从第三方系统中导入设备的历史数据和离线缓存数据。
本教程模拟了如下情境:在第三方系统存有历史数据的某设备,现因业务需要,要将该设备接入EnOS。该设备在第三方系统遗留的测点的历史数据也需要接入EnOS,如下图所示:

接下来本文将指导你完成场景中的历史数据迁移任务。
任务描述¶
本教程需要完成的步骤,如下图所示:

下文的操作将按照流程图中的步骤编号进行,实线箭头表示各步骤之间的依赖关系:
创建好模型之后才能配置存储策略
创建好产品才能创建消息集成通道
你也可以在遵循依赖关系的前提下自行安排执行每项操作的顺序。
虚线箭头表示测点数据的传输方向:由虚拟的第三方系统、经过消息集成通道、最后到达EnOS的时序数据管理TSDB。
前提条件¶
你需要有开发者角色,以访问应用门户的开发者控制台,如果没有需联系组织管理员添加。
在自己的计算机上安装了Java开发环境。
步骤¶
步骤 1:创建模型、产品,注册设备¶
创建一个虚拟电流表的模型,其具体参数如下,创建模型的具体步骤参见创建模型:
虚拟电流表模型¶ 字段
取值
模型标识符
test.ammeter
模型名称
虚拟电流表
分类
无
模型关系
无
模型模板
无
模型描述
无
为该设备定义如下的要素:
虚拟电流表模型要素¶ 字段
取值
功能类型
测点
名称
实时电流
标识符
current
测点类型
AI
是否有质量位
无
数据类型
double
单位
电流:毫安 | mA
自定义标签
无
描述
无
基于 虚拟电流表 模型,创建一个产品,具体参数如下,创建产品的具体方法,参见创建产品(设备集合):
虚拟电流表产品¶ 字段
取值
产品名称
虚拟电流表产品
节点类型
设备
模型
虚拟电流表
数据格式
JSON
证书双向认证
禁用
产品描述
无
基于 虚拟电流表产品,注册一个虚拟电流表设备,其参数如下,注册设备的具体方法,参见注册设备:
虚拟电流表设备¶ 字段
取值
产品
虚拟电流表设备
Device Key
simulatedAmmeter
设备名称
虚拟电流表设备
时区/城市
UTC+08:00
步骤 2:配置时序数据存储策略¶
在 时序数据管理 > 存储策略 中,为 虚拟电流表产品 配置TSDB存储策略。
选择一个已有的策略分组,或者新建一个分组,点击 修改分组,勾选 test.ammeter,点击 确认,将该模型纳入到该策略分组中。
点击 AI原始数据 右边的
,勾选 test.ammeter,将该模型下所有测点加入当前存储策略。
由于之前配置的测点 实时电流 为AI类型数据,本教程也无需对其进行归一化处理,所以将测点加入 AI原始数据 存储策略中。
步骤 3:创建消息集成通道¶
步骤 4:使用MQTT Client发送模拟数据¶
将如下Java示例代码复制到你使用的代码编辑器,根据注释及下文说明替换其中参数,然后运行:
import org.eclipse.paho.client.mqttv3.*; //使用Eclipse Paho MQTT客户端模拟第三方系统
public class IntegrationSample {
public static String brokerURL = "tcp://tcp-address:port"; //点击通道的 详情 按钮,可以查看Broker URL
public static String clientId = "id12345|securemode=4,signmethod=sha1,timestamp=6355824601597|"; //点击通道的 详情 按钮,可以查看ClientID
public static String userName = "%channel%&12345"; //点击通道的 详情 按钮,可以查看Username
public static String password = "7C4095981435F3ABCD4E6CFFF8F7E27F26975505"; //点击通道的 详情 按钮,可以查看Password
public static MqttClient sampleClient=null;
public static void postPoint(int i) throws MqttException { //以下函数用于构造要发送的Json字符串
String topic = "/sys/O0DUTcD8/integration/measurepoint/post"; //用于测点数据上报请求的MQTT Topic,点击通道的 详情 按钮,可以查看
long currTs=1570024800000L+10*i; //构造一个历史时间的时间戳,并不断累加变量i(整形)以模拟一个时间段。该时间戳必须精确到毫秒,即必须为一个13位的长整型。该时间段须早于当前时间。
double value=28.01+0.1*i; //模拟测点数据
String content="{\n" +
" \"id\":\""+currTs+"\",\n" +
" \"version\":\"1.0\",\n" +
" \"method\":\"integration.measurepoint.post\",\n" +
" \"params\":[\n" +
" {\n" +
" \"deviceKey\":\"simulatedAmmeter\",\n" +
" \"time\":"+currTs+",\n" +
" \"measurepoints\":{\n" +
" \"current\":"+ value +" \n" +
" }\n" +
" }\n" +
" ]\n" +
"}";
//构造一个request的数据,其结构和各参数的含义参见 https://www.envisioniot.com/docs/device-connection/zh_CN/3.0.0/reference/mqtt_offline/report_offline_points.html
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(1);
sampleClient.publish(topic, message); //发布消息
}
public static void main(String[] args) throws MqttException {
sampleClient = new MqttClient(brokerURL, clientId); //创建MQTT客户端
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(false);
connOpts.setUserName(userName);
connOpts.setPassword(password.toCharArray());
//连接
sampleClient.connect(connOpts);
for(int i=0;i<1000;i++ ){ //连续发送1000次测点数据
postPoint(i);
}
sampleClient.disconnect(); //断开连接
sampleClient.close();
}
}
上述代码中的brokerURL
、clientId
、userName
、password
、topic
,可以通过点击 simulated_ammeter_integration 的 详情 按钮以查看:

结果¶
等待一段时间后,选择 时序数据管理 > 时序洞察。选择 虚拟电流表设备, 将查询时间段设置为前一步的代码段中currTs
代表的时间段,选择 current 测点。观察到该历史时间段的测点数据已经存储于TSDB当中。
