admin管理员组文章数量:1582042
MQTT断线重连及订阅消息恢复
注意注意,MQTT重连后需要重新订阅主题才能重新接收到消息
我这里使用的是
//设置断开后重新连接
options.setAutomaticReconnect(true);
@Override
public void connectionLost(Throwable throwable) {
log.error("连接断开,下面做重连...");
long reconnectTimes = 1;
while (true) {
try {
if (mqttClient.isConnected()) {
log.warn("mqtt reconnect success end");
break;
}
if(reconnectTimes == 10){
//当重连次数达到10次时,就抛出异常,不在重连
log.warn("mqtt reconnect error");
return;
}
log.warn("mqtt reconnect times = {} try again...", reconnectTimes++);
mqttClient.reconnect();
} catch (MqttException e) {
log.error("", e);
}
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
// e1.printStackTrace();
}
}
}
看MQTT的connec的源码发现了一段代码使我找到了解决方案
MqttAsyncClient 的 connect()方法
public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback) throws MqttException, MqttSecurityException {
......
//省略
......
comms.setNetworkModules(createNetworkModules(serverURI, options));
comms.setReconnectCallback(new MqttReconnectCallback(automaticReconnect));
// Insert our own callback to iterate through the URIs till the connect
// succeeds
MqttToken userToken = new MqttToken(getClientId());
ConnectActionListener connectActionListener = new ConnectActionListener(this, persistence, comms, options,
userToken, userContext, callback, reconnecting);
userToken.setActionCallback(connectActionListener);
userToken.setUserContext(this);
// If we are using the MqttCallbackExtended, set it on the
// connectActionListener
if (this.mqttCallback instanceof MqttCallbackExtended) {
connectActionListener.setMqttCallbackExtended((MqttCallbackExtended) this.mqttCallback);
}
comms.setNetworkModuleIndex(0);
connectActionListener.connect();
return userToken;
}
MqttReconnectCallback 是实现MqttCallbackExtended接口的
发现comms中有设置重连的回调对象
comms.setReconnectCallback(new MqttReconnectCallback(automaticReconnect));
但是怎么把这个回调由我们来主动放进去呢?继续往下看源码可以发现
MqttReconnectCallback对象只是在连接丢失connectionLost的时候进行循环连接
点击startReconnectCycle()最终又会回到
MqttAsyncClient 的 connect()方法
class MqttReconnectCallback implements MqttCallbackExtended {
final boolean automaticReconnect;
MqttReconnectCallback(boolean isAutomaticReconnect) {
automaticReconnect = isAutomaticReconnect;
}
public void connectionLost(Throwable cause) {
if (automaticReconnect) {
// Automatic reconnect is set so make sure comms is in resting
// state
comms.setRestingState(true);
reconnecting = true;
startReconnectCycle();
}
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
}
public void deliveryComplete(IMqttDeliveryToken token) {
}
public void connectComplete(boolean reconnect, String serverURI) {
}
}
也就是如果我们在之前放入client的回调对象是实现的 MqttCallbackExtended 接口,则MQTT会将我们的回调对象放入 connectActionListener 中 然后由 connectActionListener实现具体的connect
接下来我们将 MessageCallback 对象改为实现 MqttCallbackExtended这个接口,然后实现下面方法
mqttClient.setCallback(new MqttCallbackExtended () {
/**
* Called when the connection to the server is completed successfully.
*
* @param reconnect If true, the connection was the result of automatic reconnect.
* @param serverURI The server URI that the connection was made to.
*/
@Override
public void connectComplete(boolean reconnect, String serverURI) {
try{
//如果监测到有,号,说明要订阅多个主题
if(mqttTopic.contains(",")){
//多主题
String[] mqttTopics = mqttTopic.split(",");
mqttClient.subscribe(mqttTopics);
}else{
//单主题
mqttClient.subscribe(mqttTopic);
}
log.info("----TAG", "connectComplete: 订阅主题成功");
}catch(Exception e){
e.printStackTrace();
log.info("----TAG", "error: 订阅主题失败");
}
}
然后可能在同一个环境,比方测试服和本地,创建同ip端口,用户密码clientId一样的客户端,那么2边会占用资源,需要加上异常报错,我的处理方式是连接10次不行就让他掉线,还需要在报错的地方加上处理:
//当创建客户端的时候出现 已断开连接,有可能是在另一个环境下启动了该客户端,直接吧这边的客户端关闭,不然另一边会无限重连
if(e.getMessage().equals("已断开连接") || e.getMessage().equals("客户机未连接")){
try {
mqttClient.close();
} catch (MqttException ex) {
ex.printStackTrace();
}
}
以下是我的开发完整代码,使用了多线程方式创建,
package com.t4cloud.t.sensor.entity;
import com.t4cloud.t.base.redis.topic.entity.RedisMsg;
import com.t4cloud.t.base.utils.RedisTopicUtil;
import com.t4cloud.t.sensor.constant.MqttClientManager;
import com.t4cloud.t.sensor.entity.vo.SensorMqttMsg;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
//MQTT客户端线程
@Slf4j
public class MqttClientThread extends Thread{
//连接地址
private String serverURL;
//MQTT客户端登录用户名
private String mqttUsername;
//MQTT客户端密码
private String mqttPassWord;
//MQTT订阅主题
private String mqttTopic;
//MQTT的client
private String clientId;
//产品id
private String productId;
//推送至我们自己的RedisTopIc中channel
private String channel = "mqtt";
//mqtt实体类
private MqttClient mqttClient;
//构造函数
public MqttClientThread(String serverURL,String mqttUsername,String mqttPassWord,String mqttTopic,String clientId,String productId) {
this.serverURL = serverURL;
this.mqttUsername = mqttUsername;
this.mqttPassWord = mqttPassWord;
this.mqttTopic = mqttTopic;
this.clientId = clientId;
this.productId = productId;
}
//线程方法
public void run(){
try {
// host为主机名,clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,
// MemoryPersistence设置clientid的保存形式,默认为以内存保存,就用username
mqttClient = new MqttClient(serverURL, clientId, new MemoryPersistence());
// 配置参数信息
MqttConnectOptions options = new MqttConnectOptions();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
// 这里设置为true表示每次连接到服务器都以新的身份连接
options.setCleanSession(true);
// 设置用户名
options.setUserName(mqttUsername);
// 设置密码
options.setPassword(mqttPassWord.toCharArray());
// 设置超时时间 单位为秒
options.setConnectionTimeout(10);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
// options.setKeepAliveInterval(20);
//设置断开后重新连接
options.setAutomaticReconnect(true);
// 连接
mqttClient.connect(options);
// 订阅
//如果监测到有,号,说明要订阅多个主题
if(mqttTopic.contains(",")){
//多主题
String[] mqttTopics = mqttTopic.split(",");
mqttClient.subscribe(mqttTopics);
}else{
//单主题
mqttClient.subscribe(mqttTopic);
}
// 设置回调
mqttClient.setCallback(new MqttCallbackExtended () {
/**
* Called when the connection to the server is completed successfully.
*
* @param reconnect If true, the connection was the result of automatic reconnect.
* @param serverURI The server URI that the connection was made to.
*/
@Override
public void connectComplete(boolean reconnect, String serverURI) {
try{
//如果监测到有,号,说明要订阅多个主题
if(mqttTopic.contains(",")){
//多主题
String[] mqttTopics = mqttTopic.split(",");
mqttClient.subscribe(mqttTopics);
}else{
//单主题
mqttClient.subscribe(mqttTopic);
}
log.info("----TAG", "connectComplete: 订阅主题成功");
}catch(Exception e){
e.printStackTrace();
log.info("----TAG", "error: 订阅主题失败");
}
}
@Override
public void connectionLost(Throwable throwable) {
log.error("连接断开,下面做重连...");
long reconnectTimes = 1;
while (true) {
try {
if (mqttClient.isConnected()) {
log.warn("mqtt reconnect success end");
break;
}
if(reconnectTimes == 10){
//当重连次数达到10次时,就抛出异常,不在重连
log.warn("mqtt reconnect error");
return;
}
log.warn("mqtt reconnect times = {} try again...", reconnectTimes++);
mqttClient.reconnect();
} catch (MqttException e) {
log.error("", e);
}
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
// e1.printStackTrace();
}
}
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
log.info("接收消息主题 : " + topic);
log.info("接收消息Qos : " + mqttMessage.getQos());
log.info("接收消息内容 : " + new String(mqttMessage.getPayload()));
//向我们通道中发送消息
RedisMsg redisMsg = new RedisMsg();
redisMsg.setChannel(channel);
redisMsg.setMsg("推送MQTT消息");
SensorMqttMsg mqttMsg = new SensorMqttMsg();
mqttMsg.setProductId(productId);
mqttMsg.setPayload(new String(mqttMessage.getPayload()));
redisMsg.setData(mqttMsg);
RedisTopicUtil.sendMessage(channel, redisMsg);
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
//认证过程
log.info("deliveryComplete.............");
}
});
//放入缓存,根据clinetId吧mqttClient对象放进去
MqttClientManager.MQTT_CLIENT_MAP.putIfAbsent(clientId, mqttClient);
} catch (Exception e) {
e.printStackTrace();
//当创建客户端的时候出现 已断开连接,有可能是在另一个环境下启动了该客户端,直接吧这边的客户端关闭,不然另一边会无限重连
if(e.getMessage().equals("已断开连接") || e.getMessage().equals("客户机未连接")){
try {
mqttClient.close();
} catch (MqttException ex) {
ex.printStackTrace();
}
}
}
}
}
参考原文链接:https://blog.csdn/csdm_admin/article/details/119935243
版权声明:本文标题:MQTT断线重连及订阅消息恢复 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://m.elefans.com/dongtai/1727892594a1136464.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论