網站首頁 編程語言 正文
MQTT
MQTT是一個極其輕量級的發布/訂閱消息傳輸協議,對于需要較小代碼占用空間或網絡帶寬非常寶貴的遠程連接非常有用
有如下特點:
- 開放消息協議,簡單易實現;
- 發布訂閱模式,一對多消息發布;
- 基于TCP/IP網絡連接,提供有序,無損,雙向連接;
- 1字節固定報頭,2字節心跳報文,最小化傳輸開銷和協議交換,有效減少網絡流量;
- 消息QoS支持,可靠傳輸保證。
添加依賴
? ? ? ? maven { url "https://repo.eclipse.org/content/repositories/paho-snapshots/" }
? ? implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
? ? implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
首先,連接服務器,SERVER_HOST為主機名,CLIENT_ID為客戶端唯一標識,還需要用戶名和密碼,當然,這些一般由服務器返回。
fun connect() {
try {
//MemoryPersistence設置clientId的保存形式,默認為以內存保存
client = MqttAsyncClient(SERVER_HOST, CLIENT_ID, MemoryPersistence())
//MQTT連接設置
options = MqttConnectOptions()
with(options) {
//是否清空session,true表示每次連接到服務器都以新的身份,false表示服務器會保留客戶的連接記錄
isCleanSession = true
//用戶名和密碼
userName = USERNAME
password = PASSWORD.toCharArray()
//超時時間,單位是秒
connectionTimeout = 30
//會話心跳時間,單位是秒,服務器每隔30秒向客戶端發送消息判斷客戶端是否在線
keepAliveInterval = 30
}
//設置回調
client!!.setCallback(PushCallBack())
client!!.connect(options, context, iMqttActionListener)
} catch (e: MqttException) {
e.printStackTrace()
}
}
心跳包:在空閑時間內,如果客戶端沒有其他任何報文發送,必須發送一個PINGREQ報文到服務器,而如果服務端在 1.5 倍心跳時間內沒有收到客戶端消息,則會主動斷開客戶端的連接,發送其遺囑消息給所有訂閱者,而服務端收到PINGREQ報文之后,立即返回PINGRESP報文給客戶端。
如果兩個客戶端的 clientID 一樣,那么服務端記錄第一個客戶端連接之后再收到第二個客戶端連接請求,則會向第一個客戶端發送 Disconnect 報文斷開連接,并連接第二個客戶端。
遺囑消息:MqttConnectOptions的setWill方法可以設置遺囑消息,客戶端沒有主動向服務端發起disconnect斷開連接消息,但服務端檢測到和客戶端的連接已斷開,此時服務端將該客戶端設置的遺囑消息發送出去,遺囑Topic中不能存在通配符。
應用場景:客戶端掉線之后,可以及時通知到所有訂閱該遺囑topic的客戶端。
訂閱消息。
fun subscribeMessage(topic: String, qos: Int) {
client?.let {
try {
it.subscribe(topic, qos)
} catch (e: MqttException) {
e.printStackTrace()
}
}
}
MQTT是一種發布/訂閱的消息協議, 通過設定的主題topic,發布者向topic發送的payload負載消息會經過服務器,轉發到所有訂閱該topic的訂閱者。topic有兩個通配符,“+” 和 “#”,與 “/” 組合使用,“+” 只能表示一級topic,“#” 可以表示任意層級,例如,訂閱topic為 “guangdong/+”,發布者發布的topic可以是 guangdong,guangdong/shenzhen,但是不能是guangdong/shenzhen/nanshan,而訂閱的topic如果是 “guangdong/#” 則可以收到。
Qos為服務質量等級,qos=0,表示發送方只發一次,不管是否發成功,也不管接收方是否成功接收,適用于不重要的數據傳輸;qos=1,表示消息至少有一次到達接收方,發送方發送消息,需要等待接收方返回應答消息,如果發送方在一定時間內未收到應答,發送方將繼續發送,直到收到應答消息,刪除本地消息緩存,不再發送。適用于需要收到所有消息,客戶端可以處理重復消息;qos = 2:確保消息只一次到達接收方,一般我們都會選擇2。
發布消息
fun publish(topic: String, msg: String, isRetained: Boolean, qos: Int) {
try {
client?.let {
val message = MqttMessage()
message.qos = qos
message.isRetained = isRetained
message.payload = msg.toByteArray()
it.publish(topic, message)
}
} catch (e: MqttPersistenceException) {
e.printStackTrace()
} catch (e: MqttException) {
e.printStackTrace()
}
}
payload為負載消息,字節流類型,是 MQTT 通信傳輸的真實數據。retain是保留信息,服務端將保留對應topic最新的一條消息記錄,保留消息的作用是每次客戶端連上都會收到其topic的最后一條保留消息。
整個封裝類如下:
class MQTTManager private constructor(private val context: Context) {
private var client: MqttAsyncClient? = null
private lateinit var options: MqttConnectOptions
private val iMqttActionListener = object : IMqttActionListener {
override fun onSuccess(asyncActionToken: IMqttToken?) {
//連接成功,開始訂閱
subscribeMessage(TOPIC, 2)
}
override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
//連接失敗
}
}
companion object {
@Volatile
private var instance: MQTTManager? = null
fun getInstance(context: Context): MQTTManager = instance ?: synchronized(this) {
instance ?: MQTTManager(context).also {
instance = it
}
}
}
/**
* 連接服務器
*/
fun connect() {
try {
//MemoryPersistence設置clientId的保存形式,默認為以內存保存
client = MqttAsyncClient(SERVER_HOST, CLIENT_ID, MemoryPersistence())
//MQTT連接設置
options = MqttConnectOptions()
with(options) {
//是否清空session,true表示每次連接到服務器都以新的身份,false表示服務器會保留客戶的連接記錄
isCleanSession = true
//用戶名和密碼
userName = USERNAME
password = PASSWORD.toCharArray()
//超時時間,單位是秒
connectionTimeout = 30
//會話心跳時間,單位是秒,服務器每隔30秒向客戶端發送消息判斷客戶端是否在線
keepAliveInterval = 30
//自動重連
isAutomaticReconnect = true
}
//設置回調
client!!.setCallback(PushCallBack())
client!!.connect(options, context, iMqttActionListener)
} catch (e: MqttException) {
e.printStackTrace()
}
}
/**
* 訂閱消息
*/
fun subscribeMessage(topic: String, qos: Int) {
client?.let {
try {
it.subscribe(topic, qos)
} catch (e: MqttException) {
e.printStackTrace()
}
}
}
/**
* 發布消息
*/
fun publish(topic: String, msg: String, isRetained: Boolean, qos: Int) {
try {
client?.let {
val message = MqttMessage()
message.qos = qos
message.isRetained = isRetained
message.payload = msg.toByteArray()
it.publish(topic, message)
}
} catch (e: MqttPersistenceException) {
e.printStackTrace()
} catch (e: MqttException) {
e.printStackTrace()
}
}
/**
* 斷開連接
*/
fun disconnect() {
client?.takeIf {
it.isConnected
}?.let {
try {
it.disconnect()
instance = null
} catch (e: MqttException) {
e.printStackTrace()
}
}
}
/**
* 判斷是否連接
*/
fun isConnected() = if (client != null) client!!.isConnected else false
inner class PushCallBack : MqttCallback {
override fun connectionLost(cause: Throwable?) {
//斷開連接
}
override fun messageArrived(topic: String?, message: MqttMessage?) {
//接收消息回調
}
override fun deliveryComplete(token: IMqttDeliveryToken?) {
//發布消息完成后的回調
}
}
}
WebSocket
WebSocket是應用層的一種協議,是建立在TCP協議基礎上的,主要特點就是全雙工通信,允許服務器主動發送信息給客戶端。
這里使用OKHTTP進行WebSocket開發。
class WebSocketManager private constructor() {
private val client: OkHttpClient = OkHttpClient.Builder().writeTimeout(5, TimeUnit.SECONDS)
.readTimeout(5, TimeUnit.SECONDS)
.connectTimeout(5, TimeUnit.SECONDS)
.build()
private var webSocket: WebSocket? = null
companion object {
val instance: WebSocketManager by lazy(mode = LazyThreadSafetyMode.SYNCHRONIZED) { WebSocketManager() }
}
fun connect(url: String) {
webSocket?.cancel()
val request = Request.Builder().url(url).build()
webSocket = client.newWebSocket(request, object : WebSocketListener() {
//連接成功后回調
override fun onOpen(webSocket: WebSocket, response: Response) {
super.onOpen(webSocket, response)
}
//服務器發送消息給客戶端時回調
override fun onMessage(webSocket: WebSocket, text: String) {
super.onMessage(webSocket, text)
}
//服務器發送的byte類型消息
override fun onMessage(webSocket: WebSocket, bytes: ByteString) {
super.onMessage(webSocket, bytes)
}
//服務器連接關閉中
override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
super.onClosing(webSocket, code, reason)
}
//服務器連接已關閉
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
super.onClosed(webSocket, code, reason)
}
//服務器連接失敗
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
super.onFailure(webSocket, t, response)
}
})
}
//發送消息
private fun sendMessage(message: String) {
webSocket?.send(message)
}
private fun close(code: Int, reason: String) {
webSocket?.close(code, reason)
}
}
總結
WebSocket是一種簡單的報文協議,著重解決客戶端與服務端不能雙向通信的問題。
MQTT是基于TCP的發布/訂閱消息傳輸協議,客戶端可以創建和訂閱任意主題,并向主題發布或接收消息,此外,有許多為物聯網優化的特性,比如服務質量等級Qos,層級主題,遺囑信息等。
MQTT和WebSocket都是應用層協議,都使用TCP協議來確保可靠傳輸,都支持雙向通信,都使用二進制編碼。WebSocket更簡單靈活,MQTT相對復雜,但功能強大,大家可以根據自己的使用場景來選擇 。
原文鏈接:https://blog.csdn.net/qq_45485851/article/details/126055945
相關推薦
- 2023-11-19 DOCKER權限問題:權限不夠Got permission denied while trying
- 2022-06-22 Android開發保存QQ密碼功能_Android
- 2022-04-27 Python線程之線程安全的隊列Queue_python
- 2022-08-05 RedisConfig 配置文件
- 2022-06-30 C語言從基礎到進階全面講解數組_C 語言
- 2022-04-26 jquery實現表格行拖動排序_jquery
- 2022-05-22 Nginx設置HTTPS的方法步驟_nginx
- 2022-01-20 簡易登錄表單的制作,包括用戶名、密碼、隨機驗證碼(代碼完整,復制即用)
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支