温馨提示:这篇文章已超过424天没有更新,请注意相关的内容是否还可用!
摘要:Java可以通过使用EMQX来实现物联网MQTT通信。EMQX是一个开源的MQTT消息代理,用于处理物联网设备和应用程序之间的通信。通过Java与EMQX集成,开发人员可以轻松地建立可靠的MQTT连接,实现设备之间的数据传输和通信。这种实现方式有助于简化物联网系统的开发过程,提高系统的可靠性和性能。
文章目录
- 前言
- 一、介绍
- 1、MQTT
- 2、EMQX
- 3、Mria 集群架构
- 4、MQTTX
- 二、SpringBoot 集成 EMQX
- 1、yaml 配置
- 2、Properties 配置类
- 3、客户端连接实体 model
- 4、token 服务类
- 5、客户端 api
- 三、SpringBoot 集成 MQTT
- 1、pom 依赖
- 2、yaml 配置
- 3、Properties 配置类
- 4、连接工厂类
- 5、MQTT 回调类
- 6、MQ 服务类
- 四、MQTT 的重连策略
- 五、EMQX 的 Windows 部署启动方式
- 六、疑难解答
- 1、避免消息发送速率过快
- 2、判断 MQTT 客户端连接状态
- 总结
前言
EMQX 实现物联网 MQTT 通信。物联网的 MQ 消息通信方式。
一、介绍
1、MQTT
MQTT(Message Queuing Telemetry Transport, 消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为远程连接设备提过实时可靠的消息服务,作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(loT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。
特点: 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合; 对负载内容屏蔽的消息传输; 使用 TCP/IP 提供网络连接; 有三种消息发布服务质量: 小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量; 使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。
2、EMQX
EMQX 是一个「无限连接,任意集成,随处运行」大规模分布式物联网接入平台。
EMQX 企业版提供一体化的分布式 MQTT 消息服务和强大的 IoT 规则引擎,为高可靠、高性能的物联网实时数据移动、处理和集成提供动力,助力企业快速构建关键业务的 IoT 平台与应用。附下载地址: https://www.emqx.com/zh/try?product=enterprise 可以自行下载对应版本运行
优势: 海量连接:单节点支持 500 万 MQTT 设备连接,集群可水平扩展至支持 1 亿并发的 MQTT 连接。 高可靠:弹性伸缩,无单点故障。内置 RocksDB 可靠地持久化 MQTT 消息,确保无数据损失。 数据安全:端到端数据加密(支持国密),细粒度访问控制,保障数据安全,满足企业合规需求。 多协议:支持 MQTT、HTTP、QUIC、WebSocket、LwM2M/CoAP 或专有协议连接任何设备。 高性能:单节点支持每秒实时接收、处理与分发数百万条的 MQTT 消息。毫秒级消息交付时延。 易运维:图形化配置、操作与管理,实时监测运行状态。支持 MQTT 跟踪进行端到端问题分析。
3、Mria 集群架构
支持全新的 Mria 集群架构,在此架构下 EMQX 水平扩展性得到指数级提升,单个集群可以轻松支持 1 亿 MQTT 连接,这使得 EMQX 5.0 成为目前全球最具扩展性的 MQTT Broker。
在构建满足用户业务需求的更大规模集群的同时,Mria 架构还能够降低大规模部署下的脑裂风险以及脑裂后的影响,以提供更加稳定可靠的物联网数据接入服务。
具体可以查看官方文档: https://docs.emqx.com/zh/enterprise/v5.1/deploy/cluster/create-cluster.html
4、MQTTX
MQTTX 是由 EMQ 开发的一款开源跨平台 MQTT 5.0 桌面客户端,它兼容 macOS,Linux 以及 Windows 系统。MQTTX 的用户界面 UI 采用聊天式设计,使得操作逻辑更加简明直观。它支持用户快速创建和保存多个 MQTT 连接,便于测试 MQTT/MQTTS 连接,以及 MQTT 消息的订阅和发布。
主要功能
采用聊天界面设计,使得操作更加简单明了
跨平台兼容,支持在 Windows,macOS,Linux 系统上运行
100% 兼容 MQTT v5.0,v3.1.1 和 v3.1 协议
订阅的 MQTT 主题支持自定义颜色标签
支持单向和双向 SSL 认证,同时支持 CA 和自签名证书
支持通过 WebSocket 连接 MQTT 服务器
支持 Hex, Base64, JSON, Plaintext 等 Payload 格式转换
自定义脚本支持模拟 MQTT 发布/订阅测试
提供完整的日志记录功能
多语言支持:简体中文、英语、日语、土耳其语及匈牙利语 ??? ??? ??? ??? ???
自由切换 Light、Dark、Night 三种主题模式
二、SpringBoot 集成 EMQX
1、yaml 配置
# EMQX配置 emqx: # EMQX服务地址,端口号默认18083 url: http://127.0.0.1:18083 # 认证用户名 username: admin # 密码 password: admin123456
2、Properties 配置类
/** * EMQX配置 */ @Data @Configuration @ConfigurationProperties(prefix = "emqx") public class EmqxConfig { private String url; private String username; private String password; }
3、客户端连接实体 model
客户端连接实现模型类。
/** * EMQX客户端连接model */ @Data public class EmqxClientModel { Integer awaiting_rel_cnt; Integer awaiting_rel_max; Boolean clean_start; /** * 客户端id */ String clientid; /** * 连接状态 */ Boolean connected; Long connected_at; Long created_at; Long disconnected_at; Integer expiry_interval; Integer heap_size; Integer inflight_cnt; Integer inflight_max; /** * ip地址 */ String ip_address; Boolean is_bridge; /** * 心跳检测时间s */ Integer keepalive; Integer mailbox_len; Integer mqueue_dropped; Integer mqueue_len; /** * 消息队列最大长度 */ Integer mqueue_max; String node; Integer port; String proto_name; Integer proto_ver; Integer recv_cnt; Integer recv_msg; }
4、token 服务类
提供获取 token 的方法。
@Component @RequiredArgsConstructor public class EmqxTokenService { private final EmqxConfig emqxConfig; public String getToken(){ String authentication = emqxConfig.getUsername() + ":" + emqxConfig.getPassword(); return "Basic " + Base64.getEncoder().encodeToString(authentication.getBytes()); } }
5、客户端 api
提供对外调用的 api 服务。
- 查询客户端连接状态。
- 查询客户端连接信息。
- 删除客户端连接。
@Slf4j @Component @RequiredArgsConstructor public class EmqxClientsApi { private final EmqxConfig emqxConfig; private final EmqxTokenService emqxTokenService; /** * 根据客户端id查询连接状态 * * @param clientId 客户端id * @return 连接状态 */ public boolean getConnectedStatus(String clientId) { EmqxClientModel client = getByClientId(clientId); if (client == null) { return false; } return client.getConnected(); } /** * 根据客户端id查询客户端信息 * * @param clientId 客户端id * @return 客户端信息 */ public EmqxClientModel getByClientId(String clientId) { String url = String.format(emqxConfig.getUrl() + "/clients/%s", clientId); HttpResponse httpResponse; try { httpResponse = HttpRequest.get(url) .header("Authorization", emqxTokenService.getToken()) .execute(); } catch (Exception e) { log.info("未查到emqx客户端:clientId=" + clientId + "[msg]:" + e.getMessage()); return null; } if (httpResponse != null && httpResponse.getStatus() == 200) { return JSON.parseObject(httpResponse.body(), EmqxClientModel.class); } return null; } /** * 根据客户端id删除客户端 * * @param clientId 客户端id * @return 客户端信息 */ public void delete(String clientId) { String url = String.format(emqxConfig.getUrl() + "/clients/%s", clientId); HttpResponse httpResponse; try { httpResponse = HttpRequest.delete(url) .header("Authorization", emqxTokenService.getToken()) .execute(); } catch (IORuntimeException e) { throw new ServiceException("删除emqx客户端请求超时"); } catch (Exception e) { throw new ServiceException("删除emqx客户端请求异常:clientId=" + clientId + "[msg]:" + e.getMessage()); } if (httpResponse == null || httpResponse.getStatus() != 204) { throw new ServiceException("删除emqx客户端失败:clientId=" + clientId); } } }
三、SpringBoot 集成 MQTT
1、pom 依赖
org.eclipse.paho org.eclipse.paho.client.mqttv3 1.2.2
2、yaml 配置
spring: # MQTT配置 mqtt: # MQTT服务地址,端口号默认1883,如果有多个,用逗号隔开 host-url: tcp://127.0.0.1:1883 # 用户名 username: admin # 密码 password: admin123456 # 客户端id(不能重复) client-id: real-mqtt-client # MQTT默认的消息推送主题,实际可在调用接口时指定 default-topic: topic
3、Properties 配置类
@Configuration @ConfigurationProperties(prefix = "spring.mqtt") @Data public class MqttConfig { private String username; private String password; private String hostUrl; private String clientId; private String defaultTopic; }
4、连接工厂类
执行 mq 初始化配置、连接、消息主题订阅。
@Slf4j @Component public class MqttFactory { public static ConcurrentHashMap clientMap = new ConcurrentHashMap(); @Autowired private MqttConfig mqttConfig; @Autowired private RealPersonAccessDeviceMapper realPersonAccessDeviceMapper; /** * 在bean初始化后连接到服务器 */ @PostConstruct public void init() { String mqttStartFlag = ParamResolver.getStr(RealCommonConstants.MQTT_START_FLAG); if (StrUtil.equals(mqttStartFlag, CommonConstants.SYS_YES_NO_Y)) { // 初始化订阅主题 initSubscribeTopic(getInstance()); } } /** * 初始化客户端 */ public MqttClient getInstance() { MqttClient client = null; if (clientMap.get(mqttConfig.getClientId()) == null) { try { client = new MqttClient(mqttConfig.getHostUrl(), mqttConfig.getClientId()); // MQTT配置对象 MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); // 设置自动重连, 其它具体参数可以查看MqttConnectOptions mqttConnectOptions.setAutomaticReconnect(true); // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接 // mqttConnectOptions.setCleanSession(true); // 设置超时时间 单位为秒 mqttConnectOptions.setConnectionTimeout(10); mqttConnectOptions.setUserName(mqttConfig.getUsername()); mqttConnectOptions.setPassword(mqttConfig.getPassword().toCharArray()); // mqttConnectOptions.setServerURIs(new String[]{url}); // 设置会话心跳时间 单位为秒 mqttConnectOptions.setKeepAliveInterval(10); // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。 // mqttConnectOptions.setWill("willTopic", "offline".getBytes(), 2, false); if (!client.isConnected()) { client.connect(mqttConnectOptions); } client.setCallback(new MqttCallBack()); log.info("MQTT创建client成功={}", JSONObject.toJSONString(client)); clientMap.put(mqttConfig.getClientId(), client); } catch (MqttException e) { log.error("MQTT连接消息服务器[{}]失败", mqttConfig.getClientId() + "-" + mqttConfig.getHostUrl()); } } else { client = clientMap.get(mqttConfig.getClientId()); log.info("MQTT从map里获取到client,clientId=" + mqttConfig.getClientId()); // TODO 已采用自动重连策略 // log.info("MQTT从map里获取到client={}", JSONObject.toJSONString(client)); // if (!client.isConnected()) { // initSubscribeTopic(client); // 如果缓存里的client已经断开,则清除该缓存,再重新创建客户端连接 // clientMap.remove(mqttConfig.getClientId()); // this.getInstance(); // } } return client; } /** * 初始化订阅主题 *
* 消息等级,和主题数组一一对应,服务端将按照指定等级给订阅了主题的客户端推送消息 */ public void initSubscribeTopic(MqttClient client) { // 订阅设备发布消息主题 List upstreamTopics = new ArrayList(); List upstreamQos = new ArrayList(); upstreamTopics.add("topic_1"); upstreamQos.add(1); upstreamTopics.add("topic_2"); upstreamQos.add(0); upstreamTopics.add("topic_3"); upstreamQos.add(1); try { client.subscribe(upstreamTopics.toArray(new String[upstreamTopics.size()]), upstreamQos.stream().mapToInt(Integer::intValue).toArray()); } catch (MqttException e) { e.printStackTrace(); } } } }
5、MQTT 回调类
连接断开回调,以及消息到达回调。根据消息主题进行消息分发。
@Slf4j @Component public class MqttCallBack implements MqttCallback, MqttCallbackExtended { /** * 客户端断开连接的回调 */ @Override public void connectionLost(Throwable throwable) { log.info("客户端断开连接回调"); } @Override public void connectComplete(boolean reconnect, String serverURI) { log.info("客户端断开连接重连"); // 重新订阅 MqttFactory client = SpringContextHolder.getBeanFactory().getBean(MqttFactory.class); client.initSubscribeTopic(client.getInstance()); log.info("重连成功"); } /** * 消息到达的回调 */ @Override public void messageArrived(String topic, MqttMessage mqttMessage) { // 日志输出 MqttFactory client = SpringContextHolder.getBeanFactory().getBean(MqttFactory.class); log.info("mqtt客户端ID : {}", client.getInstance().getClientId()); log.info("mqtt接收消息主题 : {}", topic); log.info("mqtt接收消息Qos : {}", mqttMessage.getQos()); log.info("mqtt接收消息retained : {}", mqttMessage.isRetained()); log.info("mqtt接收消息内容 : {}", new String(mqttMessage.getPayload())); String message = new String(mqttMessage.getPayload()); // 消息内容 // TODO 根据消息主题进行消息分发 } /** * 消息发布成功的回调 */ @Override public void deliveryComplete(IMqttDeliveryToken token) { IMqttAsyncClient client = token.getClient(); log.info(client.getClientId() + " 发布消息成功!"); } }
6、MQ 服务类
提供消息发布等方法。
这里对发送消息方法进行了自定义封装,增加了 redis 对发送的消息进行存储,在异步的响应消息返回时,利用 redis 中发送消息的消息id实现响应数据的异步绑定。响应消息存在丢失的情况。需要合理设置发送消息的过期时间,防止时间过短导致返回的响应丢失,防止时间过长占用 redis 资源。
@Slf4j @Data @Configuration public class UMqttClientService { private final MqttFactory mqttFactory; private final StringRedisTemplate redisTemplate; /** * 订阅主题 */ public void subscribeTopic(String deviceNo) { try { // 订阅设备发布消息主题 List upstreamTopics = new ArrayList(); upstreamTopics.add(String.format(UMqttCommonConstants.EVENT, deviceNo)); upstreamTopics.add(UMqttCommonConstants.ONLINE); upstreamTopics.add(String.format(UMqttCommonConstants.RESPONSE, deviceNo)); upstreamTopics.add(String.format(UMqttCommonConstants.UPLOAD, deviceNo)); int[] upstreamQos = {1, 1, 2, 0}; mqttFactory.getInstance().subscribe(upstreamTopics.toArray(new String[0]), upstreamQos); } catch (MqttException e) { e.printStackTrace(); } } /** * 取消订阅主题 */ public void stopSubscribeTopic(String deviceNo) { try { // 取消订阅设备发布消息主题 List upstreamTopics = new ArrayList(); upstreamTopics.add(String.format(UMqttCommonConstants.EVENT, deviceNo)); upstreamTopics.add(UMqttCommonConstants.ONLINE); upstreamTopics.add(String.format(UMqttCommonConstants.RESPONSE, deviceNo)); upstreamTopics.add(String.format(UMqttCommonConstants.UPLOAD, deviceNo)); mqttFactory.getInstance().unsubscribe(upstreamTopics.toArray(new String[0])); } catch (MqttException e) { e.printStackTrace(); } } /** * 断开连接 */ public void disConnect() { try { mqttFactory.getInstance().disconnect(); } catch (MqttException e) { e.printStackTrace(); } } /** * 订阅主题 */ public void subscribe(String topic, int qos) { try { mqttFactory.getInstance().subscribe(topic, qos); } catch (MqttException e) { e.printStackTrace(); } } /** * 发布请求设备消息 * * @param deviceNo 设备编号 * @param message 消息 */ public void publish(String deviceNo, String message) { publish(1, false, String.format(UMqttCommonConstants.REQUEST, deviceNo), message); } /** * 发布请求设备消息 */ public void publish(UMqttPublishDate publishDate) { // 将消息id和方法名存到redis中:缓存3分钟 redisTemplate.opsForValue().set(UMqttCommonConstants.UMQTT_MSG_REDIS_KEY + publishDate.getId(), JSON.toJSONString(publishDate), 24, TimeUnit.HOURS); publish(1, false, String.format(UMqttCommonConstants.REQUEST, publishDate.getDeviceNo()), publishDate.getMessage()); } /** * 发布响应设备消息 */ public void publishResponse(UMqttPublishDate publishDate) { // 将消息id和方法名存到redis中:缓存3分钟 redisTemplate.opsForValue().set(UMqttCommonConstants.UMQTT_MSG_REDIS_KEY + publishDate.getId(), JSON.toJSONString(publishDate), 24, TimeUnit.HOURS); publish(1, false, String.format(UMqttCommonConstants.RESPONSE, publishDate.getDeviceNo()), publishDate.getMessage()); } /** * 发布消息 * * @param qos qos * @param retained retained * @param topic 主题 * @param message 消息 */ public void publish(int qos, boolean retained, String topic, String message) { log.info("发布消息topic:" + topic); log.info("发布消息message:" + message); MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(qos); mqttMessage.setRetained(retained); mqttMessage.setPayload(message.getBytes()); //主题的目的地,用于发布/订阅信息 MqttTopic mqttTopic = mqttFactory.getInstance().getTopic(topic); //提供一种机制来跟踪消息的传递进度 //用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度 MqttDeliveryToken token; try { //将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态 //一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。 token = mqttTopic.publish(mqttMessage); token.waitForCompletion(); } catch (MqttException e) { e.printStackTrace(); } } }
四、MQTT 的重连策略
- 在 mqtt 工厂类中进行初始化连接时,设置自动重连状态为开启。
// 设置自动重连 mqttConnectOptions.setAutomaticReconnect(true);
- 在 mqtt 回调类中,设置重连处理业务。
@Override public void connectComplete(boolean reconnect, String serverURI) { log.info("客户端断开连接重连"); // 重新订阅 MqttFactory client = SpringContextHolder.getBeanFactory().getBean(MqttFactory.class); client.initSubscribeTopic(client.getInstance()); log.info("重连成功"); }
MQTT 连接失效时,会自动进行重连,执行自定义的重连策略,重连过程中 MQTT 消息服务停止,消息处理等业务会抛出异常。
五、EMQX 的 Windows 部署启动方式
EMQX 服务需要延时启动,因为部署服务器开机时 EMQX 服务需要等待完成一些初始化操作。保证 EMQX 服务在业务服务启动前启动。
bat 脚本部署方案:
- 方式一:(每次停止服务后需重启电脑)
@echo off start cmd /k "cd /d D:\Program Files\emqx-5.0.11-windows-amd64\bin && emqx start" echo start magic-demo-biz java -jar -Dfile.encoding=utf-8 magic-demo-biz.jar
- 方式二:(管理员权限cmd进入D:\Program Files\emqx-5.0.11-windows-amd64\bin,执行emqx install安装成服务,将emqx服务设置为手动启动)
@echo off sc start emqx echo start magic-demo-biz java -jar -Dfile.encoding=utf-8 magic-demo-biz.jar
六、疑难解答
1、避免消息发送速率过快
当需要批量发送大量消息时,如果消息发送频率过快,会导致 EMQX 服务器会主动将当前发送消息的客户端连接断开,因此在发送消息时需要控制 MQTT 消息的发送频率。
for (int i = 0; i
2、判断 MQTT 客户端连接状态
会有 MQTT 客户端连接存在但连接状态为已断开的情况。因此判断 MQTT 客户端连接状态时,需要获取 MQTT 客户端连接的实际连接状态,而不是仅判断 MQTT 客户端连接是否存在。
/** * 根据客户端id查询连接状态 * * @param clientId 客户端id * @return 连接状态 */ public boolean getConnectedStatus(String clientId) { EmqxClientModel client = getByClientId(clientId); if (client == null) { return false; } return client.getConnected(); // 查询实际连接状态 }
总结
MQTT 原理同 MQ 消息发送与接收,EMQX 的 MQTT 客服端连接即消息队列。
还没有评论,来说两句吧...