日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學(xué)無(wú)先后,達(dá)者為師

網(wǎng)站首頁(yè) 編程語(yǔ)言 正文

golang?gin?監(jiān)聽(tīng)rabbitmq隊(duì)列無(wú)限消費(fèi)的案例代碼_Golang

作者:lj907722644 ? 更新時(shí)間: 2022-12-28 編程語(yǔ)言

golang gin 監(jiān)聽(tīng)rabbitmq隊(duì)列無(wú)限消費(fèi)

連接rabbitmq

package database

import (
	"github.com/streadway/amqp"
	"log"
	"reflect"
	"yy-data-processing/common/config"
)

var RabbitConn *amqp.Connection
var RabbitChannel *amqp.Channel

func InitRabbitmq() {
	var err error
	RabbitConn, err = amqp.Dial(config.Config.RabbitUrl)
	if err != nil {
		log.Println("連接RabbitMQ失敗")
		panic(err)
	}
	RabbitChannel, err = RabbitConn.Channel()
	if err != nil {
		log.Println("獲取RabbitMQ channel失敗")
		panic(err)
	}
}

// 0表示channel未關(guān)閉,1表示channel已關(guān)閉
func CheckRabbitClosed(ch amqp.Channel) int64 {
	d := reflect.ValueOf(ch)
	i := d.FieldByName("closed").Int()
	return i
}

創(chuàng)建生產(chǎn)者

package service

import (
	"encoding/json"
	"github.com/streadway/amqp"
	"log"
	"yy-data-processing/common/config"
	"yy-data-processing/common/database"
	"yy-data-processing/model"
)

func Producer() {
	// 聲明隊(duì)列,沒(méi)有則創(chuàng)建
	// 隊(duì)列名稱、是否持久化、所有消費(fèi)者與隊(duì)列斷開時(shí)是否自動(dòng)刪除隊(duì)列、是否獨(dú)享(不同連接的channel能否使用該隊(duì)列)
	declare, err := database.RabbitChannel.QueueDeclare(config.Config.HawkSaveQueueName, true, false, false, false, nil)
	if err != nil {
		log.Printf("聲明隊(duì)列 %v 失敗, error: %v", config.Config.HawkSaveQueueName, err)
		panic(err)
	}

	request := model.Request{}
	marshal, _ := json.Marshal(request )
	// exchange、routing key、mandatory、immediate
	err = database.RabbitChannel.Publish("", declare.Name, false, false, amqp.Publishing{
		ContentType: "text/plain",
		Body:        []byte(marshal),
	})
	if err != nil {
		log.Printf("生產(chǎn)者發(fā)送消息失敗, error: %v", err)
	} else {
		log.Println("生產(chǎn)者發(fā)送消息成功")
	}
}

創(chuàng)建消費(fèi)者

package service

import (
	"encoding/json"
	"log"
	"os"
	"strings"
	"sync"
	"time"
	"yy-data-processing/common/config"
	"yy-data-processing/common/database"
	"yy-data-processing/model"
)

func Consumer() {
	// 聲明隊(duì)列,沒(méi)有則創(chuàng)建
	// 隊(duì)列名稱、是否持久化、所有消費(fèi)者與隊(duì)列斷開時(shí)是否自動(dòng)刪除隊(duì)列、是否獨(dú)享(不同連接的channel能否使用該隊(duì)列)
	_, err := database.RabbitChannel.QueueDeclare(config.Config.QueueName, true, false, false, false, nil)
	if err != nil {
		log.Printf("聲明隊(duì)列 %v 失敗, error: %v", config.Config.QueueName, err)
		panic(err)
	}
    
	// 隊(duì)列名稱、consumer、auto-ack、是否獨(dú)享
	// deliveries是一個(gè)管道,有消息到隊(duì)列,就會(huì)消費(fèi),消費(fèi)者的消息只需要從deliveries這個(gè)管道獲取
	deliveries, err := database.RabbitChannel.Consume(config.Config.QueueName, "", true, false, false, false, nil)
	if err != nil {
		log.Printf("從隊(duì)列 %v 獲取數(shù)據(jù)失敗, error: %v", config.Config.QueueName, err)
	} else {
		log.Println("從消費(fèi)隊(duì)列獲取任務(wù)成功")
	}
    
    // 阻塞住
	for {
		select {
		case message := <-deliveries:
			closed := database.CheckRabbitClosed(*database.RabbitChannel)
			if closed == 1 { // channel 已關(guān)閉,重連一下
				database.InitRabbitmq()
			} else {
				msgData := string(message.Body)
				request := model.Request{}
				err := json.Unmarshal([]byte(msgData), &request)
				if err != nil {
					log.Printf("解析rabbitmq數(shù)據(jù) %v 失敗, error: %v", msgData, err)
				} else {
					// TODO...
                    // 處理邏輯
					
				}
			}
		}
	}
}

main方法協(xié)程調(diào)用

package main

import (
	"log"
	"yy-data-processing/common/config"
	"yy-data-processing/common/database"
	"yy-data-processing/router"
	"yy-data-processing/service"
)

func main() {
	// 初始化路由
	routers := router.InitRouters()

	// 初始化RabbitMQ
	database.InitRabbitmq()
	go service.Producer()
	go service.Consumer()

	port := config.Config.Port
	if err := routers.Run(":" + port); err != nil {
		log.Printf("啟動(dòng)服務(wù)失敗: ", err)
	}

}

原文鏈接:https://blog.csdn.net/u013071014/article/details/128016871

欄目分類
最近更新