日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學(xué)無先后,達(dá)者為師

網(wǎng)站首頁 編程語言 正文

golang操作rocketmq的示例代碼_Golang

作者:專職 ? 更新時間: 2022-06-08 編程語言

RocketMQ 是什么

Github 上關(guān)于 RocketMQ 的介紹:
RcoketMQ 是一款低延遲、高可靠、可伸縮、易于使用的消息中間件。具有以下特性:

  • 支持發(fā)布/訂閱(Pub/Sub)和點(diǎn)對點(diǎn)(P2P)消息模型
  • 在一個隊(duì)列中可靠的先進(jìn)先出(FIFO)和嚴(yán)格的順序傳遞
  • 支持拉(pull)和推(push)兩種消息模式
  • 單一隊(duì)列百萬消息的堆積能力
  • 支持多種消息協(xié)議,如 JMS、MQTT 等
  • 分布式高可用的部署架構(gòu),滿足至少一次消息傳遞語義
  • 提供 docker 鏡像用于隔離測試和云集群部署
  • 提供配置、指標(biāo)和監(jiān)控等功能豐富的 Dashboard

對于這些特性描述,大家簡單過一眼就即可,深入學(xué)習(xí)之后自然就明白了。

下面看下golang操作rocketmq的示例代碼,內(nèi)容如下所示:

下載

go get github.com/apache/rocketmq-client-go/v2

代碼

func main() {
	// 1. 創(chuàng)建主題
	//CreateTopic("test-04", 10909)
	// 2. 生產(chǎn)者向主題中發(fā)送消息
	//SendSyncMessage("hello world0002")
	// 3. 消費(fèi)者訂閱主題并消費(fèi)
	SubscribeMessage()
}
func CreateTopic(topicName string, port int) {
	// 創(chuàng)建主題
	testAdmin, err := admin.NewAdmin(admin.WithResolver(primitive.NewPassthroughResolver([]string{"ip:server_port"})))
	if err != nil {
		fmt.Println(err)
	}
	err = testAdmin.CreateTopic(
		context.Background(),
		admin.WithTopicCreate(topicName),
		admin.WithBrokerAddrCreate(fmt.Sprintf("ip:%d", port)),
	)
	fmt.Println(err)
func SendSyncMessage(message string) {
	endPoint := []string{"ip:server_port"}
	p, err := rocketmq.NewProducer(
		producer.WithNameServer(endPoint),
		//producer.WithNsResolver(primitive.NewPassthroughResolver(endPoint)),
		producer.WithRetry(2),
	err = p.Start()
	result, err := p.SendSync(context.Background(), &primitive.Message{
		Topic: "test",
		Body:  []byte(message),
	})
	fmt.Println(result.Status, result)
func SubscribeMessage() {
	// 訂閱主題、并消費(fèi)
	c, err := rocketmq.NewPushConsumer(
		consumer.WithNameServer(endPoint),
		consumer.WithConsumerModel(consumer.Clustering),
		consumer.WithGroupName("GID_TEST01"),
		//fmt.Println(err)
	err = c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context,
		msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
		for i := range msgs {
			fmt.Printf("subscribe callback: %v \n", msgs[i])
		}
		return consumer.ConsumeSuccess, nil
		//fmt.Println(err.Error())
	// Note: start after subscribe
	err = c.Start()
		os.Exit(-1)
	c.Shutdown()

原文鏈接:https://www.cnblogs.com/mayanan/p/16106021.html

欄目分類
最近更新