日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

SpringBoot整合MQTT(MqttClient)

作者:Charge8 更新時間: 2023-09-12 編程語言

一、SpringBoot整合MQTT

創建項目,引入 MQTT依賴:

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.3.12.RELEASE</version>
        </dependency>

        <!-- spring-integration-mqtt依賴 -->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
            <version>6.1.2</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.28</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>

1、yml配置文件

在 application.yml文件中,定義 MQTT連接信息。

## MQTT 基本連接參數 ##
mqtt:
  host: tcp://192.168.xxx.xxx:1883
  #  host: tcp://broker.emqx.io:1883
  userName: admin
  passWord: xxxxxx
  qos: 1
  clientId: ClientId_local #ClientId_local必須唯一。
  timeout: 10 # 超時時間
  keepalive: 30 # 保持連接時間
  clearSession: true   # 清除會話(設置為false,斷開連接,重連后使用原來的會話 保留訂閱的主題,能接收離線期間的消息)
  topic1: A/b/#  # 通配符主題只能用于訂閱,不能用于發布。+:表示單層通配符,#:表示多層通配符
  topic2: A/abc
  topic3: ABC

2、MQTT配置類

創建一個 MqttConfig配置類,并獲取配置文件的 MQTT的連接參數。創建 MyMqttClient類注入Spring。

@Slf4j
@Configuration
public class MqttConfig {

    @Value("${mqtt.host}")
    public String host;
    @Value("${mqtt.username}")
    public String username;
    @Value("${mqtt.password}")
    public String password;
    @Value("${mqtt.clientId}")
    public String clientId;
    @Value("${mqtt.timeout}")
    public int timeOut;
    @Value("${mqtt.keepalive}")
    public int keepAlive;

    @Value("${mqtt.clearSession}")
    public boolean clearSession;
    @Value("${mqtt.topic1}")
    public String topic1;
    @Value("${mqtt.topic2}")
    public String topic2;
    @Value("${mqtt.topic3}")
    public String topic3;

    @Bean//注入Spring
    public MyMqttClient myMqttClient() {
        MyMqttClient myMqttClient = new MyMqttClient(host, username, password, clientId, timeOut, keepAlive, clearSession);
        for (int i = 0; i < 10; i++) {
            try {
                myMqttClient.connect();
                // 這里可以訂閱主題,推薦放到 MqttCallbackExtended.connectComplete方法中
                //myMqttClient.subscribe("ABC", 1);
                return myMqttClient;
            } catch (MqttException e) {
                log.error("== MqttConfig ==> MQTT connect exception, connect time = {}", i);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
        }
        return myMqttClient;
    }

}

3、MQTT 客戶端封裝類

創建 MQTT 客戶端封裝類MyMqttClient。對 MQTT Broker進行操作。

@Slf4j
public class MyMqttClient {

    /**
     * MQTT Broker 基本連接參數,用戶名、密碼為非必選參數
     */
    private String host;
    private String username;
    private String password;
    private String clientId;
    private int timeout;
    private int keepalive;
    private boolean clearSession;

    /**
     * MQTT 客戶端
     */
    private static MqttClient client;

    public static MqttClient getClient() {
        return client;
    }

    public static void setClient(MqttClient client) {
        MyMqttClient.client = client;
    }

    public MyMqttClient(String host, String username, String password, String clientId, int timeOut, int keepAlive, boolean clearSession) {
        this.host = host;
        this.username = username;
        this.password = password;
        this.clientId = clientId;
        this.timeout = timeOut;
        this.keepalive = keepAlive;
        this.clearSession = clearSession;
    }

    /**
     * 設置 MQTT Broker 基本連接參數
     *
     * @param username
     * @param password
     * @param timeout
     * @param keepalive
     * @return
     */
    public MqttConnectOptions setMqttConnectOptions(String username, String password, int timeout, int keepalive) {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        options.setConnectionTimeout(timeout);
        options.setKeepAliveInterval(keepalive);
        options.setCleanSession(clearSession);
        options.setAutomaticReconnect(true);
        return options;
    }

    /**
     * 連接 MQTT Broker,得到 MqttClient連接對象
     */
    public void connect() throws MqttException {
        if (client == null) {
            client = new MqttClient(host, clientId, new MemoryPersistence());
            // 設置回調
            client.setCallback(new MyMqttCallback(MyMqttClient.this));
        }
        // 連接參數
        MqttConnectOptions mqttConnectOptions = setMqttConnectOptions(username, password, timeout, keepalive);
        if (!client.isConnected()) {
            client.connect(mqttConnectOptions);
        } else {
            client.disconnect();
            client.connect(mqttConnectOptions);
        }
        log.info("== MyMqttClient ==> MQTT connect success");//未發生異常,則連接成功
    }

    /**
     * 發布,默認qos為0,非持久化
     *
     * @param pushMessage
     * @param topic
     */
    public void publish(String pushMessage, String topic) {
        publish(pushMessage, topic, 0, false);
    }

    /**
     * 發布消息
     *
     * @param pushMessage
     * @param topic
     * @param qos
     * @param retained:留存
     */
    public void publish(String pushMessage, String topic, int qos, boolean retained) {
        MqttMessage message = new MqttMessage();
        message.setPayload(pushMessage.getBytes());
        message.setQos(qos);
        message.setRetained(retained);
        MqttTopic mqttTopic = MyMqttClient.getClient().getTopic(topic);
        if (null == mqttTopic) {
            log.error("== MyMqttClient ==> topic is not exist");
        }
        MqttDeliveryToken token;//Delivery:配送
        synchronized (this) {//注意:這里一定要同步,否則,在多線程publish的情況下,線程會發生死鎖,分析見文章最后補充
            try {
                token = mqttTopic.publish(message);//也是發送到執行隊列中,等待執行線程執行,將消息發送到消息中間件
                token.waitForCompletion(1000L);
            } catch (MqttPersistenceException e) {
                e.printStackTrace();
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 訂閱某個主題,qos默認為0
     *
     * @param topic
     */
    public void subscribe(String topic) {
        subscribe(topic, 0);
    }

    /**
     * 訂閱某個主題
     *
     * @param topic
     * @param qos
     */
    public void subscribe(String topic, int qos) {
        try {
            MyMqttClient.getClient().subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
        log.info("== MyMqttClient ==> 訂閱主題成功:topic = {}, qos = {}", topic, qos);
    }


    /**
     * 取消訂閱主題
     *
     * @param topic 主題名稱
     */
    public void cleanTopic(String topic) {
        if (client != null && client.isConnected()) {
            try {
                client.unsubscribe(topic);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        } else {
            log.error("== MyMqttClient ==> 取消訂閱失敗!");
        }
        log.info("== MyMqttClient ==> 取消訂閱主題成功:topic = {}", topic);
    }

}

說明:

  • MqttClient: 同步調用客戶端,使用阻塞方法通信。
  • MqttClientPersistence: 代表一個持久的數據存儲,用于在傳輸過程中存儲出站和入站的信息,使其能夠傳遞到指定的 QoS。
  • MqttConnectOptions: 連接選項,用于指定連接的參數,下面列舉一些常見的方法。
    • setUserName: 設置用戶名
    • setPassword: 設置密碼
    • setCleanSession: 設置是否清除會話
    • setKeepAliveInterval: 設置心跳間隔
    • setConnectionTimeout: 設置連接超時時間
    • setAutomaticReconnect: 設置是否自動重連

4、MqttClient回調類

創建一個 MqttClient回調類MyMqttCallback。

@Slf4j
public class MyMqttCallback implements MqttCallbackExtended {

    //手動注入
    private MqttConfig mqttConfig = SpringUtils.getBean(MqttConfig.class);


    private MyMqttClient myMqttClient;

    public MyMqttCallback(MyMqttClient myMqttClient) {
        this.myMqttClient = myMqttClient;
    }

    /**
     * MQTT Broker連接成功時被調用的方法。在該方法中可以執行 訂閱系統約定的主題(推薦使用)。
     * 如果 MQTT Broker斷開連接之后又重新連接成功時,主題也需要再次訂閱,將重新訂閱主題放在連接成功后的回調方法中比較合理。
     *
     * @param reconnect
     * @param serverURI MQTT Broker的url
     */
    @Override
    public void connectComplete(boolean reconnect, String serverURI) {
        String connectMode = reconnect ? "重連" : "直連";
        log.info("== MyMqttCallback ==> MQTT 連接成功,連接方式:{},serverURI:{}", connectMode, serverURI);
        //訂閱主題
        myMqttClient.subscribe(mqttConfig.topic1, 1);
        myMqttClient.subscribe(mqttConfig.topic2, 1);
        myMqttClient.subscribe(mqttConfig.topic3, 1);

        List<String> topicList = new ArrayList<>();
        topicList.add(mqttConfig.topic1);
        topicList.add(mqttConfig.topic2);
        topicList.add(mqttConfig.topic3);
        log.info("== MyMqttCallback ==> 連接方式:{},訂閱主題成功,topic:{}", connectMode, topicList);
    }


    /**
     * 丟失連接,可在這里做重連
     * 只會調用一次
     *
     * @param throwable
     */
    @Override
    public void connectionLost(Throwable throwable) {
        log.error("== MyMqttCallback ==> connectionLost 連接斷開,5S之后嘗試重連: {}", throwable.getMessage());
        long reconnectTimes = 1;
        while (true) {
            try {
                if (MyMqttClient.getClient().isConnected()) {
                    //判斷已經重新連接成功  需要重新訂閱主題 可以在這個if里面訂閱主題  或者 connectComplete(方法里面)  看你們自己選擇
                    log.warn("== MyMqttCallback ==> mqtt reconnect success end  重新連接  重新訂閱成功");
                    return;
                }
                reconnectTimes += 1;
                log.warn("== MyMqttCallback ==> mqtt reconnect times = {} try again...  mqtt重新連接時間 {}", reconnectTimes, reconnectTimes);
                MyMqttClient.getClient().reconnect();
            } catch (MqttException e) {
                log.error("== MyMqttCallback ==> mqtt斷連異常", e);
            }
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e1) {
            }
        }
    }

    /**
     * 接收到消息(subscribe訂閱的主題消息)時被調用的方法
     *
     * @param topic
     * @param mqttMessage
     * @throws Exception 后得到的消息會執行到這里面
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        log.info("== MyMqttCallback ==> messageArrived 接收消息主題: {},接收消息內容: {}", topic, new String(mqttMessage.getPayload()));
        /**
         * 根據訂閱的主題分別處理業務。可以通過if-else或者策略模式來分別處理不同的主題消息。
         */
        //topic1主題
        if (topic.equals("ABC")) {
            Map maps = (Map) JSON.parse(new String(mqttMessage.getPayload(), StandardCharsets.UTF_8));
            //TODO 業務處理
            //doSomething1(maps);
            log.info("== MyMqttCallback ==> messageArrived 接收消息主題: {},{}業務處理消息內容完成", topic, "TodoService1");
        }
        //topic2主題
        if (topic.equals("A/b/1qaz")) {
            Map maps = (Map) JSON.parse(new String(mqttMessage.getPayload(), StandardCharsets.UTF_8));
            //TODO 業務處理
            //doSomething2(maps);
            log.info("== MyMqttCallback ==> messageArrived 接收消息主題: {},{}業務處理消息內容完成", topic, "TodoService2");
        }
    }

    /**
     * 消息發送(publish)完成時被調用的方法
     *
     * @param iMqttDeliveryToken
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        log.info("== MyMqttCallback ==> deliveryComplete 消息發送完成,Complete= {}", iMqttDeliveryToken.isComplete());
    }

}

MqttCallback類方法說明:

  • connectionLost(Throwable cause): 連接丟失時被調用
  • messageArrived(String topic, MqttMessage message): 接收到消息時被調用
  • deliveryComplete(IMqttDeliveryToken token): 消息發送完成時被調用

MqttCallbackExtended類方法說明:該類繼承MqttCallback類

  • connectComplete(boolean reconnect, String serverURI): 連接丟失時被調用

4.1 SpringUtils工具類

@Component
public class SpringUtils implements BeanFactoryPostProcessor, ApplicationContextAware {
    /**
     * Spring應用上下文環境
     */
    private static ConfigurableListableBeanFactory beanFactory;

    private static ApplicationContext applicationContext;

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
        SpringUtils.beanFactory = beanFactory;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        SpringUtils.applicationContext = applicationContext;
    }

    /**
     * 獲取對象
     *
     * @param name
     * @return Object 一個以所給名字注冊的bean的實例
     * @throws org.springframework.beans.BeansException
     */
    @SuppressWarnings("unchecked")
    public static <T> T getBean(String name) throws BeansException {
        return (T) beanFactory.getBean(name);
    }

    /**
     * 獲取類型為requiredType的對象
     *
     * @param clz
     * @return
     * @throws org.springframework.beans.BeansException
     */
    public static <T> T getBean(Class<T> clz) throws BeansException {
        T result = (T) beanFactory.getBean(clz);
        return result;
    }

    /**
     * 如果BeanFactory包含一個與所給名稱匹配的bean定義,則返回true
     *
     * @param name
     * @return boolean
     */
    public static boolean containsBean(String name) {
        return beanFactory.containsBean(name);
    }

    /**
     * 判斷以給定名字注冊的bean定義是一個singleton還是一個prototype。 如果與給定名字相應的bean定義沒有被找到,將會拋出一個異常(NoSuchBeanDefinitionException)
     *
     * @param name
     * @return boolean
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     */
    public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException {
        return beanFactory.isSingleton(name);
    }

    /**
     * @param name
     * @return Class 注冊對象的類型
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     */
    public static Class<?> getType(String name) throws NoSuchBeanDefinitionException {
        return beanFactory.getType(name);
    }

    /**
     * 如果給定的bean名字在bean定義中有別名,則返回這些別名
     *
     * @param name
     * @return
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     */
    public static String[] getAliases(String name) throws NoSuchBeanDefinitionException {
        return beanFactory.getAliases(name);
    }

    /**
     * 獲取aop代理對象
     *
     * @param invoker
     * @return
     */
    @SuppressWarnings("unchecked")
    public static <T> T getAopProxy(T invoker) {
        return (T) AopContext.currentProxy();
    }

    /**
     * 獲取當前的環境配置,無配置返回null
     *
     * @return 當前的環境配置
     */
    public static String[] getActiveProfiles() {
        return applicationContext.getEnvironment().getActiveProfiles();
    }

    /**
     * 獲取當前的環境配置,當有多個環境配置時,只獲取第一個
     *
     * @return 當前的環境配置
     */
    public static String getActiveProfile() {
        final String[] activeProfiles = getActiveProfiles();
        if (activeProfiles == null) {
            return null;
        }
        return activeProfiles[0];
    }

}

到此,Springboot 通過 MqttClient整合操作 MQTT Broker就可以了。

二、操作MQTT

我們在 service層創建一個 MqttService類,業務通過 MqttService類來統一操作 MqttClient。

1、自定義發送消息載體類

這里創建一個 MyXxxMqttMsg類,來約定發送消息的載體類格式。

@Data
public class MyXxxMqttMsg implements Serializable {

    private static final long serialVersionUID = -8303548938481407659L;

    /**
     * MD5值:MD5_lower(content + timestamp)
     */
    private String md5;

    /**
     * 消息內容
     */
    private String content = "";

    /**
     * 時間戳
     */
    private Long timestamp;


}

2、MqttService類

1)接口:

public interface MqttService {

    /**
     * 添加訂閱主題
     *
     * @param topic 主題名稱
     */
    void addTopic(String topic);

    /**
     * 取消訂閱主題
     *
     * @param topic 主題名稱
     */
    void removeTopic(String topic);

    /**
     * 發布主題消息內容
     *
     * @param msgContent
     * @param topic
     */
    void publish(String msgContent, String topic);

}

2)實現類:

@Service
public class MqttServiceImpl implements MqttService {

    @Autowired
    private MyMqttClient myMqttClient;

    @Override
    public void addTopic(String topic) {
        myMqttClient.subscribe(topic);
    }

    @Override
    public void removeTopic(String topic) {
        myMqttClient.cleanTopic(topic);
    }

    @Override
    public void publish(String msgContent, String topic) {
        //MyXxxMqttMsg 轉Json
        MyXxxMqttMsg myXxxMqttMsg = new MyXxxMqttMsg();
        myXxxMqttMsg.setContent(msgContent);
        myXxxMqttMsg.setTimestamp(System.currentTimeMillis());
        // TODO Md5值
        myXxxMqttMsg.setMd5(UUID.randomUUID().toString());
        String msgJson = JSON.toJSONString(myXxxMqttMsg);

        //發布消息
        myMqttClient.publish(msgJson, topic);
    }

3、controller類

創建一個 MyMqttController類,來操作一下 MQTT。

@RestController
@RequestMapping("/mqtt")
@Api(value = "MyMqttController", tags = {"MQTT相關操作接口"})
public class MyMqttController {
    @Autowired
    private MqttService mqttService;

    @GetMapping("/addTopic")
    @ApiOperation(value = "添加訂閱主題接口")
    public void addTopic(String topic) {
        mqttService.addTopic(topic);
    }

    @GetMapping("/removeTopic")
    @ApiOperation(value = "取消訂閱主題接口")
    public void removeTopic(String topic) {
        mqttService.removeTopic(topic);
    }

    @PostMapping("/removeTopic")
    @ApiOperation(value = "發布主題消息內容接口")
    public void removeTopic(String msgContent, String topic) {
        mqttService.publish(msgContent, topic);
    }

}

在這里插入圖片描述

訂閱和取消主題操作:MQTTX發布了一個主題消息。

在這里插入圖片描述

發布通配符主題消息:

在這里插入圖片描述

– 求知若饑,虛心若愚。

原文鏈接:https://blog.csdn.net/qq_42402854/article/details/132791347

  • 上一篇:沒有了
  • 下一篇:沒有了
欄目分類
最近更新