1、application.yml配置
spring:
mqtt:
username: test
password: qwerty123
host-url: tcp://172.18.42.34:32016
client-id: /dataProcessingTopicwf
subscribe-id: /dataProcessing
timeout: 100000
keep-alive-interval: 100
defaultTopic: $queue/+/dataProcessing
2、mqttclient
package com.catl.mqttutil.mqtt.client;
import com.catl.mqttutil.mqtt.callback.PushCallback;
import com.catl.mqttutil.mqtt.model.BytesModel;
import com.catl.mqttutil.mqtt.properties.MqttProperties;
import com.catl.mqttutil.mqtt.utils.MqttUtils;
import io.netty.buffer.ByteBuf;
import io.netty.util.internal.StringUtil;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
@Slf4j
public class MqttCustomClient {
@Autowired
private PushCallback pushCallback;
@Autowired
private MqttProperties mqttProperties;
private static MqttClient client;
public static MqttClient getClient() {
return client;
}
private static void setClient(MqttClient client) {
MqttCustomClient.client = client;
}
// public static String pubTopic;
@PostConstruct
public void init() {
// ReqHeaderUtil.platformKey = stationProperties.getPlatformkey();
// pubTopic = stationProperties.getPlatformkey() + mqttProperties.getClientId();
this.connect();
}
public String getPubTopic(String platformkey) {
return platformkey + mqttProperties.getClientId();
}
public void connect() {
MqttClient client;
try {
log.info("开始连接mqtt服务端,mqttProperties={}", mqttProperties);
client = new MqttClient(mqttProperties.getHostUrl(), "111111111111100", new MemoryPersistence());
MqttCustomClient.setClient(client);
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setUserName(mqttProperties.getUsername());
options.setPassword(mqttProperties.getPassword().toCharArray());
options.setConnectionTimeout(mqttProperties.getTimeout());
options.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());
options.setAutomaticReconnect(true);
client.setCallback(pushCallback);
client.connect(options);
if (client.isConnected()) {
client.subscribe(mqttProperties.getDefaultTopic(), 1);
} else {
IMqttToken token = client.connectWithResult(options);
token.waitForCompletion();
client.subscribe(mqttProperties.getDefaultTopic(), 1);
}
log.info("mqtt连接信息【subTopic=>{}】", mqttProperties.getDefaultTopic());
} catch (Exception e) {
if (e.getMessage().contains("已连接") || e.getMessage().contains("connected")) {
log.info("mqtt客户端已连接", e);
} else {
log.error("mqtt客户端连接异常", e);
}
}
}
/**
* 发布
*
* @param qos 连接方式
* @param retained 是否保留
* @param topic 主题
* @param pushMessage 消息体
*/
public void publish(int qos, boolean retained, String topic, ByteBuf pushMessage) {
MqttMessage message = new MqttMessage();
message.setQos(qos);
message.setRetained(retained);
byte[] array = pushMessage.array();
message.setPayload(array);
MqttTopic mTopic = MqttCustomClient.client.getTopic(topic);
if (null == mTopic) {
log.error("订阅topic[{}]不存在", topic);
}
MqttDeliveryToken token;
try {
token = mTopic.publish(message);
token.waitForCompletion();
token.setActionCallback(new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken iMqttToken) {
log.debug("mqtt服务器接收消息(publish) - 成功");
}
@Override
public void onFailure(IMqttToken iMqttToken, Throwable throwable) {
log.debug("EMQX服务器接收消息失败!");
}
});
log.error("发布topic[{}]成功", topic);
} catch (MqttException e) {
log.error("发布topic[{}]异常", topic);
}
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 订阅某个主题
*
* @param topic 主题
* @param qos 连接方式
*/
public void subscribe(String topic, int qos) {
log.info("【sub-开始订阅消息】,topic:【{}】,qos:【{}】", topic, qos);
try {
MqttCustomClient.client.subscribe(topic, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* @param commandSign 命令标识
* @param vin 车辆vin码
* @param dataBytes 数据
* @description 发布主题消息
* @date 2021/7/19 下午5:03
* @author junliu
**/
public void publish(short commandSign, String vin, byte[] dataBytes, String pubTopic) {
vin = StringUtil.isNullOrEmpty(vin) ? vin : "00000000000000000000";
BytesModel model = new BytesModel(commandSign, vin, dataBytes);
System.out.println("model" + model);
ByteBuf byteBuf = MqttUtils.toByteBuf(model);
System.out.println("byteBuf" + byteBuf);
publish(1, false, pubTopic, byteBuf);
}
/**
* @param commandSign 命令标识
* @param dataBytes 数据
* @description 发布主题消息
* @date 2021/7/19 下午5:03
* @author junliu
**/
public void publish(short commandSign, byte[] dataBytes, String pubTopic) {
this.publish(commandSign, "00000000000000000", dataBytes, pubTopic);
}
}
3、pushcallback
package com.catl.mqttutil.mqtt.callback;
import com.catl.mqttutil.mqtt.client.MqttCustomClient;
import com.catl.mqttutil.mqtt.model.BaseModel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 消费监听类
*
* @author yanxinghua
* @since 2021/7/19 9:13
*/
@Component
@Slf4j
public class PushCallback implements MqttCallback {
@Override
public void connectionLost(Throwable throwable) {
log.error("mqtt连接断开,错误信息msg-{}", throwable.getMessage());
if (MqttCustomClient.getClient() == null || !MqttCustomClient.getClient().isConnected()) {
log.info("mqtt重连中");
}
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) {
try {
log.info(">>>>>>>>>接收消息主题 : {},接收消息Qos :{}", topic, mqttMessage.getQos());
byte[] payload = mqttMessage.getPayload();
ByteBuf byteBuf = Unpooled.wrappedBuffer(payload);
// 获取控制类型
byte[] msgIdBytes = new byte[1];
byteBuf.getBytes(2, msgIdBytes, 0, 1);
// short msgId = Short.valueOf(ByteBufUtil.hexDump(reverse(msgIdBytes)), 16);
// log.warn(">>>>>>>>>【{}消息报文】:{}", msgId, ByteBufUtil.hexDump(payload));
BaseModel model = new BaseModel(byteBuf);
model.parseBody();
// SpringUtils.applicationContext.publishEvent(model);
} catch (Exception e) {
log.error("接受消息解析失败!msg-{}", e);
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
log.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
}
}
4、pom.xml
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
原创文章,作者:3628473679,如若转载,请注明出处:https://blog.ytso.com/273780.html