網站首頁 編程語言 正文
一、SpringBoot整合MQTT
創建項目,引入 MQTT依賴:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.3.12.RELEASE</version>
</dependency>
<!-- spring-integration-mqtt依賴 -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>6.1.2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.28</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
1、yml配置文件
在 application.yml文件中,定義 MQTT連接信息。
## MQTT 基本連接參數 ##
mqtt:
host: tcp://192.168.xxx.xxx:1883
# host: tcp://broker.emqx.io:1883
userName: admin
passWord: xxxxxx
qos: 1
clientId: ClientId_local #ClientId_local必須唯一。
timeout: 10 # 超時時間
keepalive: 30 # 保持連接時間
clearSession: true # 清除會話(設置為false,斷開連接,重連后使用原來的會話 保留訂閱的主題,能接收離線期間的消息)
topic1: A/b/# # 通配符主題只能用于訂閱,不能用于發布。+:表示單層通配符,#:表示多層通配符
topic2: A/abc
topic3: ABC
2、MQTT配置類
創建一個 MqttConfig配置類,并獲取配置文件的 MQTT的連接參數。創建 MyMqttClient類注入Spring。
@Slf4j
@Configuration
public class MqttConfig {
@Value("${mqtt.host}")
public String host;
@Value("${mqtt.username}")
public String username;
@Value("${mqtt.password}")
public String password;
@Value("${mqtt.clientId}")
public String clientId;
@Value("${mqtt.timeout}")
public int timeOut;
@Value("${mqtt.keepalive}")
public int keepAlive;
@Value("${mqtt.clearSession}")
public boolean clearSession;
@Value("${mqtt.topic1}")
public String topic1;
@Value("${mqtt.topic2}")
public String topic2;
@Value("${mqtt.topic3}")
public String topic3;
@Bean//注入Spring
public MyMqttClient myMqttClient() {
MyMqttClient myMqttClient = new MyMqttClient(host, username, password, clientId, timeOut, keepAlive, clearSession);
for (int i = 0; i < 10; i++) {
try {
myMqttClient.connect();
// 這里可以訂閱主題,推薦放到 MqttCallbackExtended.connectComplete方法中
//myMqttClient.subscribe("ABC", 1);
return myMqttClient;
} catch (MqttException e) {
log.error("== MqttConfig ==> MQTT connect exception, connect time = {}", i);
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
return myMqttClient;
}
}
3、MQTT 客戶端封裝類
創建 MQTT 客戶端封裝類MyMqttClient。對 MQTT Broker進行操作。
@Slf4j
public class MyMqttClient {
/**
* MQTT Broker 基本連接參數,用戶名、密碼為非必選參數
*/
private String host;
private String username;
private String password;
private String clientId;
private int timeout;
private int keepalive;
private boolean clearSession;
/**
* MQTT 客戶端
*/
private static MqttClient client;
public static MqttClient getClient() {
return client;
}
public static void setClient(MqttClient client) {
MyMqttClient.client = client;
}
public MyMqttClient(String host, String username, String password, String clientId, int timeOut, int keepAlive, boolean clearSession) {
this.host = host;
this.username = username;
this.password = password;
this.clientId = clientId;
this.timeout = timeOut;
this.keepalive = keepAlive;
this.clearSession = clearSession;
}
/**
* 設置 MQTT Broker 基本連接參數
*
* @param username
* @param password
* @param timeout
* @param keepalive
* @return
*/
public MqttConnectOptions setMqttConnectOptions(String username, String password, int timeout, int keepalive) {
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
options.setCleanSession(clearSession);
options.setAutomaticReconnect(true);
return options;
}
/**
* 連接 MQTT Broker,得到 MqttClient連接對象
*/
public void connect() throws MqttException {
if (client == null) {
client = new MqttClient(host, clientId, new MemoryPersistence());
// 設置回調
client.setCallback(new MyMqttCallback(MyMqttClient.this));
}
// 連接參數
MqttConnectOptions mqttConnectOptions = setMqttConnectOptions(username, password, timeout, keepalive);
if (!client.isConnected()) {
client.connect(mqttConnectOptions);
} else {
client.disconnect();
client.connect(mqttConnectOptions);
}
log.info("== MyMqttClient ==> MQTT connect success");//未發生異常,則連接成功
}
/**
* 發布,默認qos為0,非持久化
*
* @param pushMessage
* @param topic
*/
public void publish(String pushMessage, String topic) {
publish(pushMessage, topic, 0, false);
}
/**
* 發布消息
*
* @param pushMessage
* @param topic
* @param qos
* @param retained:留存
*/
public void publish(String pushMessage, String topic, int qos, boolean retained) {
MqttMessage message = new MqttMessage();
message.setPayload(pushMessage.getBytes());
message.setQos(qos);
message.setRetained(retained);
MqttTopic mqttTopic = MyMqttClient.getClient().getTopic(topic);
if (null == mqttTopic) {
log.error("== MyMqttClient ==> topic is not exist");
}
MqttDeliveryToken token;//Delivery:配送
synchronized (this) {//注意:這里一定要同步,否則,在多線程publish的情況下,線程會發生死鎖,分析見文章最后補充
try {
token = mqttTopic.publish(message);//也是發送到執行隊列中,等待執行線程執行,將消息發送到消息中間件
token.waitForCompletion(1000L);
} catch (MqttPersistenceException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
/**
* 訂閱某個主題,qos默認為0
*
* @param topic
*/
public void subscribe(String topic) {
subscribe(topic, 0);
}
/**
* 訂閱某個主題
*
* @param topic
* @param qos
*/
public void subscribe(String topic, int qos) {
try {
MyMqttClient.getClient().subscribe(topic, qos);
} catch (MqttException e) {
e.printStackTrace();
}
log.info("== MyMqttClient ==> 訂閱主題成功:topic = {}, qos = {}", topic, qos);
}
/**
* 取消訂閱主題
*
* @param topic 主題名稱
*/
public void cleanTopic(String topic) {
if (client != null && client.isConnected()) {
try {
client.unsubscribe(topic);
} catch (MqttException e) {
e.printStackTrace();
}
} else {
log.error("== MyMqttClient ==> 取消訂閱失敗!");
}
log.info("== MyMqttClient ==> 取消訂閱主題成功:topic = {}", topic);
}
}
說明:
- MqttClient: 同步調用客戶端,使用阻塞方法通信。
- MqttClientPersistence: 代表一個持久的數據存儲,用于在傳輸過程中存儲出站和入站的信息,使其能夠傳遞到指定的 QoS。
- MqttConnectOptions: 連接選項,用于指定連接的參數,下面列舉一些常見的方法。
- setUserName: 設置用戶名
- setPassword: 設置密碼
- setCleanSession: 設置是否清除會話
- setKeepAliveInterval: 設置心跳間隔
- setConnectionTimeout: 設置連接超時時間
- setAutomaticReconnect: 設置是否自動重連
4、MqttClient回調類
創建一個 MqttClient回調類MyMqttCallback。
@Slf4j
public class MyMqttCallback implements MqttCallbackExtended {
//手動注入
private MqttConfig mqttConfig = SpringUtils.getBean(MqttConfig.class);
private MyMqttClient myMqttClient;
public MyMqttCallback(MyMqttClient myMqttClient) {
this.myMqttClient = myMqttClient;
}
/**
* MQTT Broker連接成功時被調用的方法。在該方法中可以執行 訂閱系統約定的主題(推薦使用)。
* 如果 MQTT Broker斷開連接之后又重新連接成功時,主題也需要再次訂閱,將重新訂閱主題放在連接成功后的回調方法中比較合理。
*
* @param reconnect
* @param serverURI MQTT Broker的url
*/
@Override
public void connectComplete(boolean reconnect, String serverURI) {
String connectMode = reconnect ? "重連" : "直連";
log.info("== MyMqttCallback ==> MQTT 連接成功,連接方式:{},serverURI:{}", connectMode, serverURI);
//訂閱主題
myMqttClient.subscribe(mqttConfig.topic1, 1);
myMqttClient.subscribe(mqttConfig.topic2, 1);
myMqttClient.subscribe(mqttConfig.topic3, 1);
List<String> topicList = new ArrayList<>();
topicList.add(mqttConfig.topic1);
topicList.add(mqttConfig.topic2);
topicList.add(mqttConfig.topic3);
log.info("== MyMqttCallback ==> 連接方式:{},訂閱主題成功,topic:{}", connectMode, topicList);
}
/**
* 丟失連接,可在這里做重連
* 只會調用一次
*
* @param throwable
*/
@Override
public void connectionLost(Throwable throwable) {
log.error("== MyMqttCallback ==> connectionLost 連接斷開,5S之后嘗試重連: {}", throwable.getMessage());
long reconnectTimes = 1;
while (true) {
try {
if (MyMqttClient.getClient().isConnected()) {
//判斷已經重新連接成功 需要重新訂閱主題 可以在這個if里面訂閱主題 或者 connectComplete(方法里面) 看你們自己選擇
log.warn("== MyMqttCallback ==> mqtt reconnect success end 重新連接 重新訂閱成功");
return;
}
reconnectTimes += 1;
log.warn("== MyMqttCallback ==> mqtt reconnect times = {} try again... mqtt重新連接時間 {}", reconnectTimes, reconnectTimes);
MyMqttClient.getClient().reconnect();
} catch (MqttException e) {
log.error("== MyMqttCallback ==> mqtt斷連異常", e);
}
try {
Thread.sleep(5000);
} catch (InterruptedException e1) {
}
}
}
/**
* 接收到消息(subscribe訂閱的主題消息)時被調用的方法
*
* @param topic
* @param mqttMessage
* @throws Exception 后得到的消息會執行到這里面
*/
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
log.info("== MyMqttCallback ==> messageArrived 接收消息主題: {},接收消息內容: {}", topic, new String(mqttMessage.getPayload()));
/**
* 根據訂閱的主題分別處理業務。可以通過if-else或者策略模式來分別處理不同的主題消息。
*/
//topic1主題
if (topic.equals("ABC")) {
Map maps = (Map) JSON.parse(new String(mqttMessage.getPayload(), StandardCharsets.UTF_8));
//TODO 業務處理
//doSomething1(maps);
log.info("== MyMqttCallback ==> messageArrived 接收消息主題: {},{}業務處理消息內容完成", topic, "TodoService1");
}
//topic2主題
if (topic.equals("A/b/1qaz")) {
Map maps = (Map) JSON.parse(new String(mqttMessage.getPayload(), StandardCharsets.UTF_8));
//TODO 業務處理
//doSomething2(maps);
log.info("== MyMqttCallback ==> messageArrived 接收消息主題: {},{}業務處理消息內容完成", topic, "TodoService2");
}
}
/**
* 消息發送(publish)完成時被調用的方法
*
* @param iMqttDeliveryToken
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
log.info("== MyMqttCallback ==> deliveryComplete 消息發送完成,Complete= {}", iMqttDeliveryToken.isComplete());
}
}
MqttCallback類方法說明:
- connectionLost(Throwable cause): 連接丟失時被調用
- messageArrived(String topic, MqttMessage message): 接收到消息時被調用
- deliveryComplete(IMqttDeliveryToken token): 消息發送完成時被調用
MqttCallbackExtended類方法說明:該類繼承MqttCallback類
- connectComplete(boolean reconnect, String serverURI): 連接丟失時被調用
4.1 SpringUtils工具類
@Component
public class SpringUtils implements BeanFactoryPostProcessor, ApplicationContextAware {
/**
* Spring應用上下文環境
*/
private static ConfigurableListableBeanFactory beanFactory;
private static ApplicationContext applicationContext;
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
SpringUtils.beanFactory = beanFactory;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
SpringUtils.applicationContext = applicationContext;
}
/**
* 獲取對象
*
* @param name
* @return Object 一個以所給名字注冊的bean的實例
* @throws org.springframework.beans.BeansException
*/
@SuppressWarnings("unchecked")
public static <T> T getBean(String name) throws BeansException {
return (T) beanFactory.getBean(name);
}
/**
* 獲取類型為requiredType的對象
*
* @param clz
* @return
* @throws org.springframework.beans.BeansException
*/
public static <T> T getBean(Class<T> clz) throws BeansException {
T result = (T) beanFactory.getBean(clz);
return result;
}
/**
* 如果BeanFactory包含一個與所給名稱匹配的bean定義,則返回true
*
* @param name
* @return boolean
*/
public static boolean containsBean(String name) {
return beanFactory.containsBean(name);
}
/**
* 判斷以給定名字注冊的bean定義是一個singleton還是一個prototype。 如果與給定名字相應的bean定義沒有被找到,將會拋出一個異常(NoSuchBeanDefinitionException)
*
* @param name
* @return boolean
* @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
*/
public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException {
return beanFactory.isSingleton(name);
}
/**
* @param name
* @return Class 注冊對象的類型
* @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
*/
public static Class<?> getType(String name) throws NoSuchBeanDefinitionException {
return beanFactory.getType(name);
}
/**
* 如果給定的bean名字在bean定義中有別名,則返回這些別名
*
* @param name
* @return
* @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
*/
public static String[] getAliases(String name) throws NoSuchBeanDefinitionException {
return beanFactory.getAliases(name);
}
/**
* 獲取aop代理對象
*
* @param invoker
* @return
*/
@SuppressWarnings("unchecked")
public static <T> T getAopProxy(T invoker) {
return (T) AopContext.currentProxy();
}
/**
* 獲取當前的環境配置,無配置返回null
*
* @return 當前的環境配置
*/
public static String[] getActiveProfiles() {
return applicationContext.getEnvironment().getActiveProfiles();
}
/**
* 獲取當前的環境配置,當有多個環境配置時,只獲取第一個
*
* @return 當前的環境配置
*/
public static String getActiveProfile() {
final String[] activeProfiles = getActiveProfiles();
if (activeProfiles == null) {
return null;
}
return activeProfiles[0];
}
}
到此,Springboot 通過 MqttClient整合操作 MQTT Broker就可以了。
二、操作MQTT
我們在 service層創建一個 MqttService類,業務通過 MqttService類來統一操作 MqttClient。
1、自定義發送消息載體類
這里創建一個 MyXxxMqttMsg類,來約定發送消息的載體類格式。
@Data
public class MyXxxMqttMsg implements Serializable {
private static final long serialVersionUID = -8303548938481407659L;
/**
* MD5值:MD5_lower(content + timestamp)
*/
private String md5;
/**
* 消息內容
*/
private String content = "";
/**
* 時間戳
*/
private Long timestamp;
}
2、MqttService類
1)接口:
public interface MqttService {
/**
* 添加訂閱主題
*
* @param topic 主題名稱
*/
void addTopic(String topic);
/**
* 取消訂閱主題
*
* @param topic 主題名稱
*/
void removeTopic(String topic);
/**
* 發布主題消息內容
*
* @param msgContent
* @param topic
*/
void publish(String msgContent, String topic);
}
2)實現類:
@Service
public class MqttServiceImpl implements MqttService {
@Autowired
private MyMqttClient myMqttClient;
@Override
public void addTopic(String topic) {
myMqttClient.subscribe(topic);
}
@Override
public void removeTopic(String topic) {
myMqttClient.cleanTopic(topic);
}
@Override
public void publish(String msgContent, String topic) {
//MyXxxMqttMsg 轉Json
MyXxxMqttMsg myXxxMqttMsg = new MyXxxMqttMsg();
myXxxMqttMsg.setContent(msgContent);
myXxxMqttMsg.setTimestamp(System.currentTimeMillis());
// TODO Md5值
myXxxMqttMsg.setMd5(UUID.randomUUID().toString());
String msgJson = JSON.toJSONString(myXxxMqttMsg);
//發布消息
myMqttClient.publish(msgJson, topic);
}
3、controller類
創建一個 MyMqttController類,來操作一下 MQTT。
@RestController
@RequestMapping("/mqtt")
@Api(value = "MyMqttController", tags = {"MQTT相關操作接口"})
public class MyMqttController {
@Autowired
private MqttService mqttService;
@GetMapping("/addTopic")
@ApiOperation(value = "添加訂閱主題接口")
public void addTopic(String topic) {
mqttService.addTopic(topic);
}
@GetMapping("/removeTopic")
@ApiOperation(value = "取消訂閱主題接口")
public void removeTopic(String topic) {
mqttService.removeTopic(topic);
}
@PostMapping("/removeTopic")
@ApiOperation(value = "發布主題消息內容接口")
public void removeTopic(String msgContent, String topic) {
mqttService.publish(msgContent, topic);
}
}
訂閱和取消主題操作:MQTTX發布了一個主題消息。
發布通配符主題消息:
– 求知若饑,虛心若愚。
原文鏈接:https://blog.csdn.net/qq_42402854/article/details/132791347
- 上一篇:沒有了
- 下一篇:沒有了
相關推薦
- 2022-04-04 iview在Table表格中渲染title文字提示,使用render實現
- 2022-06-09 FreeRTOS實時操作系統的多優先級實現_操作系統
- 2023-02-02 C#實現SMTP服務發送郵件的示例代碼_C#教程
- 2023-01-18 GO實現跳躍表的示例詳解_Golang
- 2022-02-04 ImportError: No module named ‘flask_sqlalchemy‘
- 2022-07-23 C#線程間通信的異步機制_C#教程
- 2022-04-22 自定義注解+Spel實現分布式鎖
- 2023-07-15 css背景顏色不顯示
- 欄目分類
-
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支