首页 >>  正文

rabbitmq交换机

来源:baiyundou.net   日期:2024-07-16

作者:linzl

1. 为啥要用MQ

为啥使用,因为他很牛逼

2.使用docker部署单机RabbitMQ、go客户端库

docker 镜像

https://hub.docker.com/_/rabbitmq

docker pull rabbitmq:3.8.10-management-alpine

说明:management代表是带管理后台

启动容器

docker run -d --name my-rmq \n-e RABBITMQ_DEFAULT_USER=linzl \n-e RABBITMQ_DEFAULT_PASS=123 \n-p 8081:15672 \n-p 5672:5672 \nrabbitmq:3.8.10-management-alpine

rabbitmq golang 客户端库

https://github.com/streadway/amqp

go get -u github.com/streadway/amqp

测试golang 链接 mq

package main\n\nimport (\n\t"fmt"\n\t"github.com/streadway/amqp"\n\t"log"\n)\n\nfunc main() {\n\tdsn := fmt.Sprintf("amqp://%s:%s@%s:%d","linzl","123","192.168.1.6",5672)\n\tconnection, err := amqp.Dial(dsn)\n\tif err != nil {\n\t\tlog.Fatal(err)\n\t}\n\tdefer connection.Close()\n\tfmt.Println(connection)\n}

  1. 生产者创建channel发送消息给Exchange

  2. Exchange(有多种交换机)根据策略binding队列进行消息投递

  3. 队列具有推/拉模式

  4. 消费者使用channel获取消息,并确认接收或拒绝,重新入列给别的消费者

3. 用最简单的方式:生产者发送第一条消息

package main\n\nimport (\n\t"fmt"\n\t"github.com/streadway/amqp"\n\t"log"\n)\n\nfunc main() {\n\tconnection, err := amqp.Dial("amqp://linzl:[email protected]:5672")\n\tif err != nil {\n\t\tlog.Fatal(err)\n\t}\n\tdefer connection.Close()\n\t// 获取channel 连接\n\tchannelConn, err := connection.Channel()\n\tif err != nil {\n\t\tlog.Fatal(err)\n\t}\n\tdefer channelConn.Close()\n\n\t// 创建队列\n\tqueue, err := channelConn.QueueDeclare(\n\t\t"test_queue",\n\t\tfalse,\n\t\tfalse,\n\t\tfalse,\n\t\tfalse,\n\t\tnil,\n\t)\n\tif err != nil {\n\t\tlog.Fatal(err)\n\t}\n\n\t// 发生消息\n\terr = channelConn.Publish(\n\t\t"",\n\t\tqueue.Name,\n\t\tfalse,\n\t\tfalse,\n\t\tamqp.Publishing{\n\t\t\tContentType: "text/plain",\n\t\t\tBody:        []byte(`test002`),\n\t\t},\n\t)\n\tif err != nil {\n\t\tlog.Fatal(err)\n\t}\n\tfmt.Println("简单生产一条消息成功")\n}

4.用最简单的方式:消费者读取消息

对连接mq 简单封装

package AppInit\n\nimport (\n\t"fmt"\n\t"github.com/streadway/amqp"\n\t"log"\n)\n\nvar (\n\terr error\n\tMQConn *amqp.Connection\n)\n\nfunc init()  {\n\tdsn := fmt.Sprintf("amqp://%s:%s@%s:%d","linzl","123","192.168.1.6",5672)\n\tMQConn, err = amqp.Dial(dsn)\n\tif err != nil {\n\t\tlog.Fatal(err)\n\t}\n\n\tlog.Println(MQConn.Major)\n}\n\nfunc GetMQ()*amqp.Connection {\n\treturn MQConn\n}

消费者

package main\n\nimport (\n\t"fmt"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/AppInit"\n\t"log"\n)\n\nfunc main() {\n\tmq := AppInit.GetMQ()\n\tdefer mq.Close()\n\n\tchannelConn, err := mq.Channel()\n\tif err != nil {\n\t\tlog.Fatal(err)\n\t}\n\tdefer channelConn.Close()\n\n\t// 消费\n\tmsgs, err := channelConn.Consume(\n\t\t"test_queue",\n\t\t"Consumer01",\n\t\tfalse,\n\t\tfalse,\n\t\tfalse,\n\t\tfalse,\n\t\tnil,\n\t)\n\tif err != nil {\n\t\tlog.Fatal(err)\n\t}\n\tfor msg := range msgs {\n\t\tmsg.Ack(false) // 确认\n\t\tfmt.Println(msg.DeliveryTag,string(msg.Body))\n\t}\n\n}

5.简单API过程、注册流程、MQ操作简单封装

案例用户注册

简单的API过程、注册流程、MQ操作简单封装

gin 框架构建用户注册api

userModel

package User\n\ntype UserModel struct {\n\tUserID int64 `json:"user_id"`\n\tUserName string `json:"user_name"`\n}\n\nfunc NewUserModel() *UserModel {\n\treturn &UserModel{}\n}

user api

package main\n\nimport (\n\t"encoding/json"\n\t"github.com/gin-gonic/gin"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Lib"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/UserReg/Models/User"\n\t"net/http"\n\t"time"\n)\n\nfunc main()  {\n\n\tengine := gin.New()\n\tengine.Handle(http.MethodPost,"/user", func(ctx *gin.Context) {\n\t\tuser := User.NewUserModel()\n\t\terr := ctx.ShouldBindJSON(user)\n\t\tif err != nil {\n\t\t\tctx.JSON(400,err.Error())\n\t\t\treturn\n\t\t}\n\t\tuser.UserID =time.Now().Unix() // 模拟用户注册入库\n\t\tif user.UserID > 0 {  // 假设入库成功\n\t\t\tbytes, _ := json.Marshal(user)\n\t\t\tLib.NewMQ().SendMessage(Lib.QUEUE_NEWUSER,string(bytes))\n\t\t}\n\t\tctx.JSON(200,user)\n\t})\n\n\tengine.Run(":6060")\n}

mq 操作简单封装

package Lib\n\nimport (\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/AppInit"\n\t"github.com/streadway/amqp"\n\t"log"\n)\n\nconst (\n\t// 用户注册队列名称\n\tQUEUE_NEWUSER = "newuser"\n)\n\ntype MQ struct {\n\tChannel *amqp.Channel\n}\n\nfunc NewMQ() *MQ {\n\tchannel, err := AppInit.GetMQ().Channel()\n\tif err != nil {\n\t\tlog.Println(err)\n\t}\n\treturn &MQ{Channel: channel}\n}\n\n// SendMessage 发生消息到mq\nfunc (this *MQ) SendMessage(queueName string, message string) error {\n\t_, err := this.Channel.QueueDeclare(queueName, false, false,\n\t\tfalse, false, nil)\n\tif err != nil {\n\t\treturn err\n\t}\n\treturn this.Channel.Publish("", queueName, false, false,\n\t\tamqp.Publishing{\n\t\t\tContentType: "text/plain",\n\t\t\tBody:        []byte(message),\n\t\t})\n}

6.定义交换机:向2个队列同时发送消息(QueueBind)

Exchange

Direct Exchange 也叫做直接模式交换机。交换机和和一个队列绑定起来,并指定路由键, 交换机会寻找匹配的路由键的绑定,并将消息路由给对应的队列

package Lib\n\nimport (\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/AppInit"\n\t"github.com/streadway/amqp"\n\t"log"\n)\n\nconst (\n\t// 用户注册队列名称\n\tQUEUE_NEWUSER = "newuser"\n)\n\ntype MQ struct {\n\tChannel *amqp.Channel\n}\n\nfunc NewMQ() *MQ {\n\tchannel, err := AppInit.GetMQ().Channel()\n\tif err != nil {\n\t\tlog.Println(err)\n\t}\n\treturn &MQ{Channel: channel}\n}\n\n// SendMessage 发生消息到mq\nfunc (this *MQ) SendMessage(queueName string, message string) error {\n\tqueue1, err := this.Channel.QueueDeclare(queueName, false, false,\n\t\tfalse, false, nil)\n\tif err != nil {\n\t\treturn err\n\t}\n\t// 假设是其他业务方用的队列\n\tqueue2, err := this.Channel.QueueDeclare(queueName+"other", false, false,\n\t\tfalse, false, nil)\n\tif err != nil {\n\t\treturn err\n\t}\n\n\t// 声明一个交换机\n\terr = this.Channel.ExchangeDeclare("UserExchange", "direct",\n\t\tfalse, false, false, false, nil)\n\tif err != nil {\n\t\treturn err\n\t}\n\n\t// 队列1与交换机绑定\n\terr = this.Channel.QueueBind(queue1.Name, "UserReg",\n\t\t"UserExchange", false, nil)\n\tif err != nil {\n\t\treturn err\n\t}\n\t// 队列2与交换机绑定\n\terr = this.Channel.QueueBind(queue2.Name, "UserReg",\n\t\t"UserExchange", false, nil)\n\tif err != nil {\n\t\treturn err\n\t}\n\n\treturn this.Channel.Publish("UserExchange", "UserReg", false, false,\n\t\tamqp.Publishing{\n\t\t\tContentType: "text/plain",\n\t\t\tBody:        []byte(message),\n\t\t})\n}

7.整理和调整代码结构、初始化队列等

初始化队列

go-rabbitmq/Lib/QueueInit.go

package Lib\n\nimport "fmt"\n\n// UserQueueInit 用户队列初始化..\nfunc UserQueueInit() error{\n\tmq := NewMQ()\n\tif mq == nil {\n\t\treturn fmt.Errorf("mq init err")\n\t}\n\tdefer mq.Channel.Close()\n\n\t// 声明交换机\n\terr := mq.Channel.ExchangeDeclare(USER_EXCHANGE, "direct", false,\n\t\tfalse, false, false, nil)\n\tif err != nil {\n\t\treturn fmt.Errorf("Exchange error:%s",err.Error())\n\t}\n\t// 声明队列及绑定\n\tqueues := fmt.Sprintf("%s,%s",QUEUE_NEWUSER,QUEUE_NEWUSER_OTHER01)\n\terr = mq.DecQueuueAndBind(queues, USER_EXCHANGE, USER_REG_ROUTER_KEY)\n\tif err != nil {\n\t\treturn fmt.Errorf("DecQueuueAndBind error:%s",err.Error())\n\t}\n\treturn nil\n}

SendMessage改造

package Lib\n\nimport (\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/AppInit"\n\t"github.com/streadway/amqp"\n\t"log"\n\t"strings"\n)\n\nconst (\n\t// 用户注册队列名称\n\tQUEUE_NEWUSER = "newuser"\n\t// 其他业务的新用户队列\n\tQUEUE_NEWUSER_OTHER01 = "newuser-other01"\n\t// 用户业务交换机\n\tUSER_EXCHANGE = "exchange-user"\n\t// 用户注册路由key\n\tUSER_REG_ROUTER_KEY = "router-key-userreg"\n)\n\ntype MQ struct {\n\tChannel *amqp.Channel\n}\n\nfunc NewMQ() *MQ {\n\tchannel, err := AppInit.GetMQ().Channel()\n\tif err != nil {\n\t\tlog.Println(err)\n\t}\n\treturn &MQ{Channel: channel}\n}\n// DecQueuueAndBind 声明队列及绑定队列,多个队列用逗号隔开\nfunc (this *MQ)DecQueuueAndBind(queues string,exchange string,routerKey string) error {\n\tqueueList := strings.Split(queues,",")\n\tfor _,queue := range queueList {\n\t\t// 声明队列\n\t\tq, err := this.Channel.QueueDeclare(queue, false, false,\n\t\t\tfalse, false, nil)\n\t\tif err != nil {\n\t\t\treturn err\n\t\t}\n\t\t// 绑定交换机和路由key\n\t\terr = this.Channel.QueueBind(q.Name, routerKey, exchange, false, nil)\n\t\tif err != nil {\n\t\t\treturn err\n\t\t}\n\t}\n\treturn nil\n}\n\n// SendMessage 发生消息到mq\nfunc (this *MQ) SendMessage(key string, exchange string,message string) error {\n\treturn this.Channel.Publish(exchange, key, false, false,\n\t\tamqp.Publishing{\n\t\t\tContentType: "text/plain",\n\t\t\tBody:        []byte(message),\n\t\t})\n}

拉起gin框架时初始化队列

package main\n\nimport (\n\t"context"\n\t"encoding/json"\n\t"fmt"\n\t"github.com/gin-gonic/gin"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Lib"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/UserReg/Models/User"\n\t"log"\n\t"net/http"\n\t"os"\n\t"os/signal"\n\t"syscall"\n\t"time"\n)\n\nfunc main()  {\n\terrchan := make(chan error)\n\tengine := gin.Default()\n\tengine.Handle(http.MethodPost,"/user", func(ctx *gin.Context) {\n\t\tuser := User.NewUserModel()\n\t\terr := ctx.ShouldBindJSON(user)\n\t\tif err != nil {\n\t\t\tctx.JSON(400,err.Error())\n\t\t\treturn\n\t\t}\n\t\tuser.UserID =time.Now().Unix() // 模拟用户注册入库\n\t\tif user.UserID > 0 {  // 假设入库成功\n\t\t\tbytes, _ := json.Marshal(user)\n\t\t\tmq := Lib.NewMQ()\n\t\t\terr := mq.SendMessage(Lib.USER_REG_ROUTER_KEY, Lib.USER_EXCHANGE, string(bytes))\n\t\t\tif err != nil {\n\t\t\t\tlog.Println(err)\n\t\t\t\terrchan

8.客户端消费注册用户消息、确认消息

模拟消费

  1. 接收消息

  2. 模拟发生邮件

  3. ack 确认

消费

package main\n\nimport (\n\t"encoding/json"\n\t"fmt"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Lib"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/UserReg/Models/User"\n\t"github.com/streadway/amqp"\n\t"time"\n)\n\nfunc SendMail(msgs 

MQ.go 新增Consume方法

package Lib\n\nimport (\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/AppInit"\n\t"github.com/streadway/amqp"\n\t"log"\n\t"strings"\n)\n\nconst (\n\t// 用户注册队列名称\n\tQUEUE_NEWUSER = "newuser"\n\t// 其他业务的新用户队列\n\tQUEUE_NEWUSER_OTHER01 = "newuser-other01"\n\t// 用户业务交换机\n\tUSER_EXCHANGE = "exchange-user"\n\t// 用户注册路由key\n\tUSER_REG_ROUTER_KEY = "router-key-userreg"\n)\n\ntype MQ struct {\n\tChannel *amqp.Channel\n}\n\nfunc NewMQ() *MQ {\n\tchannel, err := AppInit.GetMQ().Channel()\n\tif err != nil {\n\t\tlog.Println(err)\n\t}\n\treturn &MQ{Channel: channel}\n}\n// DecQueuueAndBind 声明队列及绑定队列,多个队列用逗号隔开\nfunc (this *MQ)DecQueuueAndBind(queues string,exchange string,routerKey string) error {\n\tqueueList := strings.Split(queues,",")\n\tfor _,queue := range queueList {\n\t\t// 声明队列\n\t\tq, err := this.Channel.QueueDeclare(queue, false, false,\n\t\t\tfalse, false, nil)\n\t\tif err != nil {\n\t\t\treturn err\n\t\t}\n\t\t// 绑定交换机和路由key\n\t\terr = this.Channel.QueueBind(q.Name, routerKey, exchange, false, nil)\n\t\tif err != nil {\n\t\t\treturn err\n\t\t}\n\t}\n\treturn nil\n}\n\n// SendMessage 发生消息到mq\nfunc (this *MQ) SendMessage(key string, exchange string,message string) error {\n\treturn this.Channel.Publish(exchange, key, false, false,\n\t\tamqp.Publishing{\n\t\t\tContentType: "text/plain",\n\t\t\tBody:        []byte(message),\n\t\t})\n}\n\nfunc (this *MQ)Consume(queueName string, key string,callback func(

9. 多消费者消费消息、重新入列

消费者改造支持多消费者

package main\n\nimport (\n\t"encoding/json"\n\t"flag"\n\t"fmt"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Lib"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/UserReg/Models/User"\n\t"github.com/streadway/amqp"\n\t"log"\n\t"time"\n)\n\nfunc SendMail(msgs 

10.消费者限流:ACK后再收新消息

代码改造一下,使用协程消费

package main\n\nimport (\n\t"encoding/json"\n\t"flag"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Lib"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/UserReg/Models/User"\n\t"github.com/streadway/amqp"\n\t"log"\n\t"time"\n)\n\nfunc Send(c string, msg amqp.Delivery) error {\n\ttime.time.Second * 3)\n\tuserModel := &User.UserModel{}\n\tjson.Unmarshal(msg.Body, userModel)\n\tlog.Printf("消费者:%s,向userid=%d的用户发生邮件n",c,userModel.UserID)\n\tmsg.Ack(false)\n\treturn nil\n}\n\nfunc SendMail(msgs 

消费者限流

mq.Channel.Qos(2, 0, false)  第一个参数prefetchCount 可以限制,当接受prefetchCount 条消息后

只有ack 之后可以继续接收消费下一条消息,起到保护消费者的作用

mq.Channel.Qos(2, 0, false)

11. 开启模式、记录失败的消息

当生产消息时,由于mq 的网络问题或是其他问题,可能出现发送失败的情况

当有些敏感信息又不能失败,需要确保每一条消息都发送成功

因此mq 有一个机制就是可以开发 模式,当给mq发送消息时,如果成功会有一个ack 回执

ack 成功说明发送消息成功,ack 失败时需要记录日志(写到mysql 或redis)什么的进行重发

第一步

  1. 开启模式

// SetConfirm 设置模式\nfunc (this *MQ)Setconfirm() {\n\terr := this.Channel.confirm(false)\n\tif err != nil {\n\t\tlog.Println(err)\n\t}\n}

  1. 在MQ 的结构体添加属性

type MQ struct {\n\tChannel *amqp.Channel\n\tnotifyConfirm chan amqp.Confirmation\n}

// SetConfirm 设置模式\nfunc (this *MQ)Setconfirm() {\n\terr := this.Channel.confirm(false)\n\tif err != nil {\n\t\tlog.Println(err)\n\t}\n\tthis.notifyConfirm = this.Channel.NotifyPublish(make(chan amqp.Confirmation))\n}

发生消息后,当服务器确认此chan 会有数据传输过来

发生消息时调用SetConfirm 开启 模式 完整代码

MQ.go

package Lib\n\nimport (\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/AppInit"\n\t"github.com/streadway/amqp"\n\t"log"\n\t"strings"\n)\n\nconst (\n\t// 用户注册队列名称\n\tQUEUE_NEWUSER = "newuser"\n\t// 其他业务的新用户队列\n\tQUEUE_NEWUSER_OTHER01 = "newuser-other01"\n\t// 用户业务交换机\n\tUSER_EXCHANGE = "exchange-user"\n\t// 用户注册路由key\n\tUSER_REG_ROUTER_KEY = "router-key-userreg"\n)\n\ntype MQ struct {\n\tChannel *amqp.Channel\n\tnotifyConfirm chan amqp.Confirmation\n}\n\nfunc NewMQ() *MQ {\n\tchannel, err := AppInit.GetMQ().Channel()\n\tif err != nil {\n\t\tlog.Println(err)\n\t}\n\treturn &MQ{Channel: channel}\n}\n// DecQueuueAndBind 声明队列及绑定队列,多个队列用逗号隔开\nfunc (this *MQ)DecQueuueAndBind(queues string,exchange string,routerKey string) error {\n\tqueueList := strings.Split(queues,",")\n\tfor _,queue := range queueList {\n\t\t// 声明队列\n\t\tq, err := this.Channel.QueueDeclare(queue, false, false,\n\t\t\tfalse, false, nil)\n\t\tif err != nil {\n\t\t\treturn err\n\t\t}\n\t\t// 绑定交换机和路由key\n\t\terr = this.Channel.QueueBind(q.Name, routerKey, exchange, false, nil)\n\t\tif err != nil {\n\t\t\treturn err\n\t\t}\n\t}\n\treturn nil\n}\n\n// SetConfirm 设置模式\nfunc (this *MQ)Setconfirm() {\n\terr := this.Channel.confirm(false)\n\tif err != nil {\n\t\tlog.Println(err)\n\t}\n\tthis.notifyConfirm = this.Channel.NotifyPublish(make(chan amqp.Confirmation))\n\tgo this.Listenconfirm()\n}\n// ListenConfirm 监听消息\nfunc (this *MQ)Listenconfirm() {\n\tdefer this.Channel.Close()\n\tret := 

生产者

package main\n\nimport (\n\t"context"\n\t"encoding/json"\n\t"fmt"\n\t"github.com/gin-gonic/gin"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Lib"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/UserReg/Models/User"\n\t"log"\n\t"net/http"\n\t"os"\n\t"os/signal"\n\t"syscall"\n\t"time"\n)\n\nfunc main()  {\n\terrchan := make(chan error)\n\tengine := gin.Default()\n\tengine.Handle(http.MethodPost,"/user", func(ctx *gin.Context) {\n\t\tuser := User.NewUserModel()\n\t\terr := ctx.ShouldBindJSON(user)\n\t\tif err != nil {\n\t\t\tctx.JSON(400,err.Error())\n\t\t\treturn\n\t\t}\n\t\tuser.UserID =time.Now().Unix() // 模拟用户注册入库\n\t\tif user.UserID > 0 {  // 假设入库成功\n\t\t\tbytes, _ := json.Marshal(user)\n\t\t\tmq := Lib.NewMQ()\n\t\t\t// 开启 模式\n\t\t\tmq.Setconfirm()\n\t\t\terr := mq.SendMessage(Lib.USER_REG_ROUTER_KEY, Lib.USER_EXCHANGE, string(bytes))\n\t\t\tif err != nil {\n\t\t\t\tlog.Println(err)\n\t\t\t\terrchan

12.监听消息入列回执:NotifyReturn的用法

mandatory参数

如果为true,在exchange正常且可以到达的情况下。

如果exchange+routeKey 无法投递给queue,那么MQ会将消息还给生产者

如果为false,则直接丢弃

模拟无法投递到exchange+routeKey 通过rabbitmq 管理后台,手动解绑(写多个队列的需要全部解绑)

package Lib\n\nimport (\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/AppInit"\n\t"github.com/streadway/amqp"\n\t"log"\n\t"strings"\n)\n\nconst (\n\t// 用户注册队列名称\n\tQUEUE_NEWUSER = "newuser"\n\t// 其他业务的新用户队列\n\tQUEUE_NEWUSER_OTHER01 = "newuser-other01"\n\t// 用户业务交换机\n\tUSER_EXCHANGE = "exchange-user"\n\t// 用户注册路由key\n\tUSER_REG_ROUTER_KEY = "router-key-userreg"\n)\n\ntype MQ struct {\n\tChannel *amqp.Channel\n\tnotifyConfirm chan amqp.Confirmation\n\n\t// NotifyReturn的用法\n\tnotifyReturn chan amqp.Return\n}\n\nfunc NewMQ() *MQ {\n\tchannel, err := AppInit.GetMQ().Channel()\n\tif err != nil {\n\t\tlog.Println(err)\n\t}\n\treturn &MQ{Channel: channel}\n}\n// DecQueuueAndBind 声明队列及绑定队列,多个队列用逗号隔开\nfunc (this *MQ)DecQueuueAndBind(queues string,exchange string,routerKey string) error {\n\tqueueList := strings.Split(queues,",")\n\tfor _,queue := range queueList {\n\t\t// 声明队列\n\t\tq, err := this.Channel.QueueDeclare(queue, false, false,\n\t\t\tfalse, false, nil)\n\t\tif err != nil {\n\t\t\treturn err\n\t\t}\n\t\t// 绑定交换机和路由key\n\t\terr = this.Channel.QueueBind(q.Name, routerKey, exchange, false, nil)\n\t\tif err != nil {\n\t\t\treturn err\n\t\t}\n\t}\n\treturn nil\n}\n\n// SetConfirm 设置模式\nfunc (this *MQ)Setconfirm() {\n\terr := this.Channel.confirm(false)\n\tif err != nil {\n\t\tlog.Println(err)\n\t}\n\tthis.notifyConfirm = this.Channel.NotifyPublish(make(chan amqp.Confirmation))\n\tgo this.Listenconfirm()\n}\n// ListenConfirm 监听消息\nfunc (this *MQ)Listenconfirm() {\n\tdefer this.Channel.Close()\n\tret := 

SendMessage 之前需要 NotifyReturn

package main\n\nimport (\n\t"context"\n\t"encoding/json"\n\t"fmt"\n\t"github.com/gin-gonic/gin"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Lib"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/UserReg/Models/User"\n\t"log"\n\t"net/http"\n\t"os"\n\t"os/signal"\n\t"syscall"\n\t"time"\n)\n\nfunc main()  {\n\terrchan := make(chan error)\n\tengine := gin.Default()\n\tengine.Handle(http.MethodPost,"/user", func(ctx *gin.Context) {\n\t\tuser := User.NewUserModel()\n\t\terr := ctx.ShouldBindJSON(user)\n\t\tif err != nil {\n\t\t\tctx.JSON(400,err.Error())\n\t\t\treturn\n\t\t}\n\t\tuser.UserID =time.Now().Unix() // 模拟用户注册入库\n\t\tif user.UserID > 0 {  // 假设入库成功\n\t\t\tbytes, _ := json.Marshal(user)\n\t\t\tmq := Lib.NewMQ()\n\t\t\t// 开启 模式\n\t\t\tmq.Setconfirm()\n\n\t\t\t// 监听return 需要在发送消息之前\n\t\t\tmq.NotifyReturn()\n\t\t\terr := mq.SendMessage(Lib.USER_REG_ROUTER_KEY, Lib.USER_EXCHANGE, string(bytes))\n\t\t\tif err != nil {\n\t\t\t\tlog.Println(err)\n\t\t\t\terrchan

13. 以用户注册为例产生的事务需求、延迟队列使用

基本实现

  1. 生产者注册成功之后发生消息

  2. 消息者接受消息后,调用邮件服务

  3. 调用失败。重新入列(要加个延迟时间,失败次数越多,延迟时间越长)

  4. 超过最大重试次数。就不发邮件了

安装插件

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

这是一个延迟交换机插件。省去我们自己写规则的麻烦

由于我们使用的是3.8.10。因此使用3.8.10对应的插件

拷贝plugins中,容器对应的目录是/opt/rabbitmq/plugins

docker cp rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez my-rmq:/opt/rabbitmq/plugins

启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

延迟队列使用

官方文档:

// ... elided code ...\nMap args = new HashMap();\nargs.put("x-delayed-type", "direct");\nchannel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);\n// ... more code ...

// ... elided code ...\nbyte[] messageBodyBytes = "delayed payload".getBytes("UTF-8");\nMap headers = new HashMap();\nheaders.put("x-delay", 5000);\nAMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);\nchannel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);\n\nbyte[] messageBodyBytes2 = "more delayed payload".getBytes("UTF-8");\nMap headers2 = new HashMap();\nheaders2.put("x-delay", 1000);\nAMQP.BasicProperties.Builder props2 = new AMQP.BasicProperties.Builder().headers(headers2);\nchannel.basicPublish("my-exchange", "", props2.build(), messageBodyBytes2);\n// ... more code ...

因此go 代码改动

定义交互机的kind 应该使用x-delayed-message,args 用map[string]interface{}{"x-delayed-type":"direct"}

// UserDelayInit 创建用户延迟交换机\nfunc UserDelayInit() error {\n\tmq := NewMQ()\n\tif mq == nil {\n\t\treturn fmt.Errorf("UserDelayInit init error")\n\t}\n\tdefer mq.Channel.Close()\n\t// 声明交换机\n\terr := mq.Channel.ExchangeDeclare(USER_EXCHANGE_DELAY, "x-delayed-message", false, false,\n\t\tfalse, false, map[string]interface{}{"x-delayed-type":"direct"})\n\tif err != nil {\n\t\treturn fmt.Errorf("UserDelayInit ExchangeDeclare error")\n\t}\n\t// 声明队列名称及绑定\n\tqueues := fmt.Sprintf("%s",QUEUE_NEWUSER)\n\terr = mq.DecQueuueAndBind(queues, USER_EXCHANGE_DELAY, USER_REG_ROUTER_KEY)\n\tif err != nil {\n\t\treturn fmt.Errorf("DecQueuueAndBind error:%s",err.Error())\n\t}\n\treturn nil\n}

发送消息时需要设置Headers: map[string]interface{}{"x-delay":delay}, // 单位毫秒

// SendDelayMessage 发生延迟消息到mq\n// delay 单位是ms\nfunc (this *MQ) SendDelayMessage(key string, exchange string,message string,delay int) error {\n\treturn this.Channel.Publish(exchange, key, true, false,\n\t\tamqp.Publishing{\n\t\t\tHeaders: map[string]interface{}{"x-delay":delay}, // 单位毫秒\n\t\t\tContentType: "text/plain",\n\t\t\tBody:        []byte(message),\n\t\t})\n}

生产部分

go func() {\n    err := Lib.UserQueueInit()\n    if err !=nil {\n        errchan

消费者不需要调整

消息者接收消息时会延迟delay 毫秒后收到

14. 记录消费者调用失败次数、逼格SQL技巧

首先建表 user_notify

 `user_notify` (\n  `user_id` int(11) NOT NULL,\n  `notify_num` int(11) NOT NULL DEFAULT '1',\n  `is_done` int(11) NOT NULL DEFAULT '0',\n  `updatetime` datetime NOT NULL,\n  PRIMARY KEY (`user_id`)\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

一旦调用邮件服务失败,写入这个表

使用MySQL库

扩展库:https://github.com/jmoiron/sqlx

安装

go get -u github.com/jmoiron/sqlx

mysql驱动:https://github.com/go-sql-driver/mysql

安装

go get -u github.com/go-sql-driver/mysql

mysql记录常规做法(伪代码)

开启事物\n    取出该条记录。\n    if 没有\n        insert info ...\n    else \n        if notify_num >=5 // 失败5次,程序里订阅\n             \n\t\t\tuser_money=user_money-:money \n\t\t\twhere user_name=:from =:money`\n\n\tresult, err := tx.NamedExec(sql1, tm)\n\n\tif err != nil {\n\t\treturn err\n\t}\n\taffected, err := result.RowsAffected()\n\tif err != nil {\n\t\treturn err\n\t}\n\t// 受影响行为0 代表木有扣款\n\tif affected == 0 {\n\t\terr = tx.Rollback() // 回滚\n\t\tif err != nil {\n\t\t\treturn err\n\t\t}\n\t\treturn fmt.Errorf("扣款失败1111")\n\t}\n\tsql2 := "(:from,:to,:money,NOW())"\n\n\t// 写日志表\n\tresult, err = tx.NamedExec(sql2, tm)\n\tif err != nil {\n\t\terr2 := tx.Rollback() // 回滚\n\t\tif err2 != nil {\n\t\t\treturn err\n\t\t}\n\t\treturn fmt.Errorf("扣款失败2222:%s",err.Error())\n\t}\n\taffected, err = result.RowsAffected()\n\tif err != nil {\n\t\terr = tx.Rollback() // 回滚\n\t\tif err != nil {\n\t\t\treturn err\n\t\t}\n\t\treturn fmt.Errorf("扣款失败3333",err.Error())\n\t}\n\t// 受影响行为0 代表木有扣款\n\tif affected == 0 {\n\t\terr = tx.Rollback() // 回滚\n\t\tif err != nil {\n\t\t\treturn err\n\t\t}\n\t\treturn fmt.Errorf("扣款失败,写日志表出错")\n\t}\n\ttx.Commit()\n\treturn nil\n}

a公司实际扣款操作

package main\n\nimport (\n\t"fmt"\n\t"github.com/gin-gonic/gin"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Trans"\n\t"log"\n\t"net/http"\n\t"os"\n\t"os/signal"\n\t"syscall"\n)\n\nfunc main() {\n\tengine := gin.Default()\n\tengine.Use(Trans.HandleErr())\n\tengine.POST("/", func(ctx *gin.Context) {\n\t\ttransModel := Trans.NewTransModel()\n\t\terr := ctx.ShouldBindJSON(&transModel)\n\t\tTrans.CheckErr(err,"ShouldBindJSON error:")\n\n\t\t// 执行转账\n\t\terr = Trans.TransMoney(transModel)\n\t\tTrans.CheckErr(err,"TransMoney error:")\n\t\tctx.JSON(200,gin.H{"result":transModel.String()})\n\n\t})\n\terrChan := make(chan error)\n\tserver := http.Server{\n\t\tAddr: ":6060",\n\t\tHandler: engine,\n\t}\n\tgo func() {\n\t\terr := server.ListenAndServe()\n\t\tif err != nil {\n\t\t\terrChan

18.A公司转账业务逻辑:记录日志后发送消息到mq

初始化mq 队列

func TransInit() error {\n\tmq := NewMQ()\n\tif mq == nil {\n\t\treturn fmt.Errorf("UserDelayInit init error")\n\t}\n\tdefer mq.Channel.Close()\n\terr := mq.Channel.ExchangeDeclare(TRANS_EXCHANGE, "direct", false,\n\t\tfalse, false, false, nil)\n\tif err != nil {\n\t\treturn fmt.Errorf("mq.Channel.ExchangeDeclare TRANS_EXCHANGE error:%s",err.Error())\n\t}\n\terr = mq.DecQueuueAndBind(TRANS_QUEUE, TRANS_EXCHANGE, TRANS_ROUTER_KEY)\n\tif err != nil {\n\t\treturn fmt.Errorf("trans DecQueuueAndBind error:%s",err.Error())\n\t}\n\treturn nil\n}

拉起框架时,起协程拉起mq

// 初始化队列\n\tgo func() {\n\t\terr := Lib.TransInit()\n\t\tif err != nil {\n\t\t\terrChan

发送消息到mq

engine.POST("/", func(ctx *gin.Context) {\n    transModel := Trans.NewTransModel()\n    err := ctx.ShouldBindJSON(&transModel)\n    Trans.CheckErr(err,"ShouldBindJSON error:")\n\n    // 执行转账\n    err = Trans.TransMoney(transModel)\n    Trans.CheckErr(err,"TransMoney error:")\n\n    // 写mq\n    mq := Lib.NewMQ()\n    jsonBytes, _ := json.Marshal(transModel)\n    err = mq.SendMessage(Lib.TRANS_ROUTER_KEY, Lib.TRANS_EXCHANGE, string(jsonBytes))\n    Trans.CheckErr(err,"发送消息队列失败了")\n    ctx.JSON(200,gin.H{"result":transModel.String()})\n\n})

19.A公司转账业务逻辑:定时”无脑”补偿机制(上)

不管发送成功与否

思路如下: 1、我们写个 “死循环”程序

2、定时取5秒或自定义秒内 :status==0 的数据,再发一次消息

3、设定定时任务。定时清理20秒内(或自定义)status==0的消息,把它改为status=2

定时任务 第三方库

看这里 https://github.com/robfig/cron

安装: go get github.com/robfig/cron/[email protected]

https://www.jtthink.com/course/play/2461

定时任务补偿代码

package main\n\nimport (\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Trans"\n\t"github.com/robfig/cron/v3"\n\t"log"\n)\n\nconst failSql = ` STATUS=2 where \n\t\tTIMESTAMPDIFF(SECOND,updatetime,now())>30 >2`\n\n\nvar MyCron *cron.Cron\n\nfunc CronInit()error  {\n\tMyCron = cron.New(cron.WithSeconds())\n\t_, err := MyCron.AddFunc("0/3 * * * * *", FailTransLog)\n\treturn err\n}\n\n// 定时取消订单\nfunc FailTransLog()  {\n\t_, err := Trans.GetDB().(failSql)\n\tif err != nil {\n\t\tlog.Println(err)\n\t}\n\tlog.Println("更新成功")\n}\n\nfunc main() {\n\terrChan := make(chan error)\n\tgo func() {\n\t\terr := Trans.InitDB("a")\n\t\tif err != nil {\n\t\t\terrChan

20.A公司转账逻辑: 补偿机制之交易失败后“还钱 ”

两个任务

取消交易

 STATUS=2 where TIMESTAMPDIFF(SECOND,updatetime,now())>20 >2

_, err := MyCron.AddFunc("0/3 * * * * *", FailTransLog)\n\tif err != nil {\n\t\treturn err\n\t}

还钱

`,money from `translog` where `status`=2 0 limit 10
// 还钱\n_, err = MyCron.AddFunc("0/4 * * * * *", BackMoney)\nif err != nil {\n    return err\n}

SQL

首先加个字段: isback ,money from translog  where status=2 0 limit 10

这里面为了防止 数据不一致,都要依赖数据库事务

做个统一的事务提交

func clearTx(tx *sqlx.Tx) {\n\terr := tx.Commit()\n\tif err != nil && err != sql.ErrTxDone {\n\t\tlog.Println("tx err",err)\n\t}\n\tislock = false\n}

全部代码

package main\n\nimport (\n\t"context"\n\t"database/sql"\n\t"github.com/jmoiron/sqlx"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Trans"\n\t"github.com/robfig/cron/v3"\n\t"log"\n\t"time"\n)\n\n// 取消订单\nconst failSql = ` STATUS=2 where \n\t\tTIMESTAMPDIFF(SECOND,updatetime,now())>30 >2`\n\n// 从日志表里取出status=2 并且isback=0 的进行还钱操作\nconst backSql = "`,money from `translog` where `status`=2 0 limit 10"\n\n// 锁 防止上一个任务没执行完,下一个任务又开始了,产生脏读(只适用于单线程,通过变量控制锁)\nvar islock = false\n\nfunc clearTx(tx *sqlx.Tx) {\n\terr := tx.Commit()\n\tif err != nil && err != sql.ErrTxDone {\n\t\tlog.Println("tx err",err)\n\t}\n\tislock = false\n}\n// 还钱\nfunc BackMoney() {\n\tif islock {\n\t\tlog.Println("已经锁住了")\n\t\treturn\n\t}\n\ttxx, err := Trans.GetDB().BeginTxx(context.Background(), nil)\n\n\tif err != nil {\n\t\tlog.Println("事务失败",err)\n\t\treturn\n\t}\n\tislock = true // 加锁\n\tdefer clearTx(txx) // 清理事物\n\ttime.time.Second * 8)\n\n\trows, err := txx.Queryx(backSql)\n\tif err != nil {\n\t\tlog.Println("Queryx err:",err)\n\t\ttxx.Rollback()\n\t}\n\n\tdefer rows.Close()\n\n\ttransModels := []Trans.TransModel{}\n\terr = sqlx.StructScan(rows, &transModels)\n\tif err != nil {\n\t\tlog.Println("StructScan err:",err)\n\t\ttxx.Rollback()\n\t}\n\n\t// 还钱操作\n\tfor _, row := range transModels {\n\t\t_, err = txx.(" user_money=user_money+? where user_name=?",\n\t\t\trow.Money, row.From)\n\t\tif err !=nil {\n\t\t\ttxx.Rollback()\n\t\t}\n\t\t_, err = txx.(" isback=1 where tid=?",row.Tid)\n\t\tif err !=nil {\n\t\t\ttxx.Rollback()\n\t\t}\n\t}\n}\n\n\nvar MyCron *cron.Cron\n\nfunc CronInit()error  {\n\tMyCron = cron.New(cron.WithSeconds())\n\t_, err := MyCron.AddFunc("0/3 * * * * *", FailTransLog)\n\tif err != nil {\n\t\treturn err\n\t}\n\t// 还钱\n\t_, err = MyCron.AddFunc("0/4 * * * * *", BackMoney)\n\tif err != nil {\n\t\treturn err\n\t}\n\treturn err\n}\n\n// 定时取消订单\nfunc FailTransLog()  {\n\t_, err := Trans.GetDB().(failSql)\n\tif err != nil {\n\t\tlog.Println(err)\n\t}\n\tlog.Println("更新成功")\n}\n\nfunc main() {\n\terrChan := make(chan error)\n\tgo func() {\n\t\terr := Trans.InitDB("a")\n\t\tif err != nil {\n\t\t\terrChan

21.补偿机制之重发MQ消息、B公司记录日志

今天完成的任务是

取出交易时间在8秒内,且status=0的数据,进行MQ 重发

1、SQL如下 translog where TIMESTAMPDIFF(SECOND,updatetime,now())<=8 0

2、定时器设置:为 每隔2秒 处理。

B公司日志表

和A公司一样。 不需要IsBack\n\ntid 注意 不需要自增

b公司消费代码

package main\n\nimport (\n\t"encoding/json"\n\t"flag"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Lib"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Trans"\n\t"github.com/streadway/amqp"\n\t"log"\n\t"fmt"\n)\n\nfunc saveLog(tm *Trans.TransModel, msg amqp.Delivery) {\n\tfmt.Println(tm.Tid,tm.From,tm.Money)\n\tsql := "(?,?,?,?,now())"\n\t_, err := Trans.GetDB().(sql, tm.Tid,tm.From,tm.To,tm.Money)\n\tif err != nil {\n\t\tlog.Println("1111",err)\n\t}\n\tmsg.Ack(false)\n}\n\nfunc myconsumer(messages 

22.B公司业务逻辑:确认收钱

A和B 要约定个 回调地址(A是回调地址)   http://localhost:8080/callback-----A

参数:tid

SQL status=1 where tid=? 0

A 公司回调接口

// 回调接口\nengine.POST("/callback", func(ctx *gin.Context) {\n    tid := ctx.PostForm("tid")\n    sql := " `status`=1 where tid=? 0"\n    result, err := Trans.GetDB().(sql, tid)\n    affected, err2 := result.RowsAffected()\n    if err != nil || err2 != nil || affected != 1 {\n        ctx.String(200,"error")\n    } else {\n        ctx.String(200,"success")\n    }\n})

B公司使用mysql 事物保证日志及确认收钱及回调成功

B消费者 消费到记录后 执行两个过程 1) 插记录 2)把钱更新给用户 3) 回调接口 3步必须都成功。否则回滚数据库。

package main\n\nimport (\n\t"context"\n\t"database/sql"\n\t"encoding/json"\n\t"flag"\n\t"github.com/jmoiron/sqlx"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Lib"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Trans"\n\t"github.com/streadway/amqp"\n\t"io/ioutil"\n\t"log"\n\t"fmt"\n\t"net/http"\n\t"strings"\n)\n\n\nfunc clearTx(tx *sqlx.Tx) {\n\terr := tx.Commit()\n\tif err != nil && err != sql.ErrTxDone {\n\t\tlog.Println("clearTx error",err)\n\t}\n}\n\n\nfunc saveLog(tm *Trans.TransModel, msg amqp.Delivery) {\n\tfmt.Println(tm.Tid,tm.From,tm.Money)\n\tsql := "(?,?,?,?,now())"\n\t_, err := Trans.GetDB().(sql, tm.Tid,tm.From,tm.To,tm.Money)\n\tif err != nil {\n\t\tlog.Println("1111",err)\n\t}\n\tmsg.Ack(false)\n}\n\nfunc saveLogWithTx(tm *Trans.TransModel, msg amqp.Delivery)  {\n\ttxx, err := Trans.GetDB().BeginTxx(context.Background(), nil)\n\tif err != nil {\n\t\tlog.Println("BeginTxx error",err)\n\t\treturn\n\t}\n\n\tdefer clearTx(txx)\n\tsql := "(?,?,?,?,now())"\n\t_, err = txx.(sql, tm.Tid, tm.From, tm.To, tm.Money)\n\tif err != nil {\n\t\tlog.Println("(:order_no,:order_user,:order_time)", req)\n\t\t\tif err != nil {\n\t\t\t\tlog.Println(err)\n\t\t\t}\n\t\t}\n\t\tmsg.Ack(false)\n\t}\n}\n\n\nfunc main() {\n\tmq := Lib.NewMQ()\n\n\tdefer mq.Channel.Close()\n\n\terr := Trans.InitDB("a")\n\tif err != nil {\n\t\tlog.Fatal(err)\n\t}\n\n\tmq.Channel.Qos(2,0,false)\n\n\tmq.Consume(Lib.ORDER_QUEUE,"消费者1", saveOrder)\n}

查看文章精彩评论,请前往什么值得买进行阅读互动

","gnid":"98341c1f478db84bf","img_data":[{"flag":2,"img":[{"desc":"","height":"389","title":"","url":"https://p0.ssl.img.360kuai.com/t018d54c51ce390ac56.webp","width":"702"}]}],"original":0,"pat":"art_src_1,fts0,sts0","powerby":"pika","pub_time":1695891920000,"pure":"","rawurl":"http://zm.news.so.com/bab4d4d9485a5d4b43919ad4d896220e","redirect":0,"rptid":"526da860dcbebebb","rss_ext":[],"s":"t","src":"什么值得买","tag":[],"title":"go web+RabbitMQ实战速学

易贝盆942为什么我要放弃 RabbitMQ -
冶迹康17684295957 ______ 1. 需要“消息延迟”功能 这对我们来说是很重要的业务需求.当顾客订了一个服务,首先我们会发送描述相信指令的短信,然后我们会在两分钟后发送第二条描述详情的短信,而不是两条一起发送.我们希望通过这样,留给用户阅读的时间(...

易贝盆942rabbitmq适合做长连接吗 -
冶迹康17684295957 ______ 如果可以,最好不要用短链接,尤其是消费者端,因为会耗费很大cpu.而大量连接也会造成内存增多.

易贝盆942如何优雅的使用RabbitMQ -
冶迹康17684295957 ______ 按安装驱动程序,然后Fn配合无线网卡快捷键开启无线网卡,然后就可以用了.

易贝盆942如何查看rabbitmq是否启动了 -
冶迹康17684295957 ______ 可能是系统有问题了.直接换个验证过的系统盘重装系统就行了,这样就可以全程自动、顺利解决 win7系统中软件无法运行 的问题了.用u盘或者硬盘这些都是可以的,且安装速度非常快.但关键是:要有兼容性好的(兼容ide、achi、Raid模式的安装)

易贝盆942rabbitmq必须用root用户启动吗 -
冶迹康17684295957 ______ 需要吧

易贝盆942如何测试 rabbitmq 的性能 -
冶迹康17684295957 ______ 测试 rabbitmq 的性能方法如下:1、声明7个具有不同属性的queue,分别和名为test_exchage的exchange进行绑定(因为exchange为fanout类型,所以测试代码中的routing_key其实是不起作用的);2、向exchange发送具有persistent属性的消息(delivery_mode=2);3、创建7个消费者分别从上述7个queue中获取消息;4、测试结果如下

易贝盆942使用 RabbitMQ 为什么有时候会丢数据 -
冶迹康17684295957 ______ 一般情况下,是配置的原因,应该是你配置了自动确认,又写了代码进行手动确认.当你配置了自动确认时,调用basicConsume方法时rabbitmq服务端返回的确认码不是唯一的,会重复,所以又进行了手动确认,就容易导致消息丢失.如果你用的是java+spring,配置手动确认如下:其中,ackMessageListener是实现了ChannelAwareMessageListener接口的实现类实例.

易贝盆942rabbitmq消息真的可以持久化吗 -
冶迹康17684295957 ______ 消息传递的速度:用MSMQ/RabbitMq,等带持久化功能的队列组件;如果嫌太慢,就用ZeroMq(无消息持久化功能),但可以达到30W消息每秒;事件持久化的速度:由于事件都是.

易贝盆942rabbitmq如何去做流控测试 -
冶迹康17684295957 ______ rabbit mq失败后怎么执行下一个队列1、笨拙点方法,就是轮循,consume的阻塞监听可以设置timeout,通过设置一个较小的timeout,可以轮流监听几个channel,变相实现监听多个queue,对性能要求不是很高,可以使用这种方法2、还有个办...

易贝盆942rabbitmq 支持android或ios吗 -
冶迹康17684295957 ______ 您好,很高兴回答您的问题.Silverlight是建立在windows系统上的,而Android或ios不需要这个软件.这是由系统框架的差异和内核的不同造成的.b

(编辑:自媒体)
关于我们 | 客户服务 | 服务条款 | 联系我们 | 免责声明 | 网站地图 @ 白云都 2024