網(wǎng)站首頁 編程語言 正文
golang gin 監(jiān)聽rabbitmq隊(duì)列無限消費(fèi)
連接rabbitmq
package database import ( "github.com/streadway/amqp" "log" "reflect" "yy-data-processing/common/config" ) var RabbitConn *amqp.Connection var RabbitChannel *amqp.Channel func InitRabbitmq() { var err error RabbitConn, err = amqp.Dial(config.Config.RabbitUrl) if err != nil { log.Println("連接RabbitMQ失敗") panic(err) } RabbitChannel, err = RabbitConn.Channel() if err != nil { log.Println("獲取RabbitMQ channel失敗") panic(err) } } // 0表示channel未關(guān)閉,1表示channel已關(guān)閉 func CheckRabbitClosed(ch amqp.Channel) int64 { d := reflect.ValueOf(ch) i := d.FieldByName("closed").Int() return i }
創(chuàng)建生產(chǎn)者
package service import ( "encoding/json" "github.com/streadway/amqp" "log" "yy-data-processing/common/config" "yy-data-processing/common/database" "yy-data-processing/model" ) func Producer() { // 聲明隊(duì)列,沒有則創(chuàng)建 // 隊(duì)列名稱、是否持久化、所有消費(fèi)者與隊(duì)列斷開時(shí)是否自動(dòng)刪除隊(duì)列、是否獨(dú)享(不同連接的channel能否使用該隊(duì)列) declare, err := database.RabbitChannel.QueueDeclare(config.Config.HawkSaveQueueName, true, false, false, false, nil) if err != nil { log.Printf("聲明隊(duì)列 %v 失敗, error: %v", config.Config.HawkSaveQueueName, err) panic(err) } request := model.Request{} marshal, _ := json.Marshal(request ) // exchange、routing key、mandatory、immediate err = database.RabbitChannel.Publish("", declare.Name, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(marshal), }) if err != nil { log.Printf("生產(chǎn)者發(fā)送消息失敗, error: %v", err) } else { log.Println("生產(chǎn)者發(fā)送消息成功") } }
創(chuàng)建消費(fèi)者
package service import ( "encoding/json" "log" "os" "strings" "sync" "time" "yy-data-processing/common/config" "yy-data-processing/common/database" "yy-data-processing/model" ) func Consumer() { // 聲明隊(duì)列,沒有則創(chuàng)建 // 隊(duì)列名稱、是否持久化、所有消費(fèi)者與隊(duì)列斷開時(shí)是否自動(dòng)刪除隊(duì)列、是否獨(dú)享(不同連接的channel能否使用該隊(duì)列) _, err := database.RabbitChannel.QueueDeclare(config.Config.QueueName, true, false, false, false, nil) if err != nil { log.Printf("聲明隊(duì)列 %v 失敗, error: %v", config.Config.QueueName, err) panic(err) } // 隊(duì)列名稱、consumer、auto-ack、是否獨(dú)享 // deliveries是一個(gè)管道,有消息到隊(duì)列,就會(huì)消費(fèi),消費(fèi)者的消息只需要從deliveries這個(gè)管道獲取 deliveries, err := database.RabbitChannel.Consume(config.Config.QueueName, "", true, false, false, false, nil) if err != nil { log.Printf("從隊(duì)列 %v 獲取數(shù)據(jù)失敗, error: %v", config.Config.QueueName, err) } else { log.Println("從消費(fèi)隊(duì)列獲取任務(wù)成功") } // 阻塞住 for { select { case message := <-deliveries: closed := database.CheckRabbitClosed(*database.RabbitChannel) if closed == 1 { // channel 已關(guān)閉,重連一下 database.InitRabbitmq() } else { msgData := string(message.Body) request := model.Request{} err := json.Unmarshal([]byte(msgData), &request) if err != nil { log.Printf("解析rabbitmq數(shù)據(jù) %v 失敗, error: %v", msgData, err) } else { // TODO... // 處理邏輯 } } } } }
main方法協(xié)程調(diào)用
package main import ( "log" "yy-data-processing/common/config" "yy-data-processing/common/database" "yy-data-processing/router" "yy-data-processing/service" ) func main() { // 初始化路由 routers := router.InitRouters() // 初始化RabbitMQ database.InitRabbitmq() go service.Producer() go service.Consumer() port := config.Config.Port if err := routers.Run(":" + port); err != nil { log.Printf("啟動(dòng)服務(wù)失敗: ", err) } }
原文鏈接:https://blog.csdn.net/u013071014/article/details/128016871
相關(guān)推薦
- 2022-08-19 存儲(chǔ)引擎的應(yīng)用場(chǎng)景
- 2024-07-18 Spring Security概述快速入門
- 2022-10-22 Kotlin淺析null操作方法_Android
- 2023-03-29 PyTorch中g(shù)rid_sample的使用及說明_python
- 2022-09-23 Python線程threading(Thread類)_python
- 2022-05-18 python必備庫Matplotlib畫圖神器_python
- 2022-07-20 使用numpy.ndarray添加元素_python
- 2022-09-29 DevExpress的DateEdit設(shè)置顯示日期和時(shí)間的方法_C#教程
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細(xì)win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運(yùn)行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲(chǔ)小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運(yùn)算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認(rèn)證信息的處理
- Spring Security之認(rèn)證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯(cuò)誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實(shí)現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡單動(dòng)態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對(duì)象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊(duì)列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠(yuǎn)程分支