rabbitmq交换机
作者:linzl
![](https://p0.ssl.img.360kuai.com/t018d54c51ce390ac56.webp)
1. 为啥要用MQ
为啥使用,因为他很牛逼
2.使用docker部署单机RabbitMQ、go客户端库
docker 镜像
https://hub.docker.com/_/rabbitmq
docker pull rabbitmq:3.8.10-management-alpine
说明:management代表是带管理后台
启动容器
docker run -d --name my-rmq \n-e RABBITMQ_DEFAULT_USER=linzl \n-e RABBITMQ_DEFAULT_PASS=123 \n-p 8081:15672 \n-p 5672:5672 \nrabbitmq:3.8.10-management-alpine
rabbitmq golang 客户端库
https://github.com/streadway/amqp
go get -u github.com/streadway/amqp
测试golang 链接 mq
package main\n\nimport (\n\t"fmt"\n\t"github.com/streadway/amqp"\n\t"log"\n)\n\nfunc main() {\n\tdsn := fmt.Sprintf("amqp://%s:%s@%s:%d","linzl","123","192.168.1.6",5672)\n\tconnection, err := amqp.Dial(dsn)\n\tif err != nil {\n\t\tlog.Fatal(err)\n\t}\n\tdefer connection.Close()\n\tfmt.Println(connection)\n}
生产者创建channel发送消息给Exchange
Exchange(有多种交换机)根据策略binding队列进行消息投递
队列具有推/拉模式
消费者使用channel获取消息,并确认接收或拒绝,重新入列给别的消费者
3. 用最简单的方式:生产者发送第一条消息
package main\n\nimport (\n\t"fmt"\n\t"github.com/streadway/amqp"\n\t"log"\n)\n\nfunc main() {\n\tconnection, err := amqp.Dial("amqp://linzl:[email protected]:5672")\n\tif err != nil {\n\t\tlog.Fatal(err)\n\t}\n\tdefer connection.Close()\n\t// 获取channel 连接\n\tchannelConn, err := connection.Channel()\n\tif err != nil {\n\t\tlog.Fatal(err)\n\t}\n\tdefer channelConn.Close()\n\n\t// 创建队列\n\tqueue, err := channelConn.QueueDeclare(\n\t\t"test_queue",\n\t\tfalse,\n\t\tfalse,\n\t\tfalse,\n\t\tfalse,\n\t\tnil,\n\t)\n\tif err != nil {\n\t\tlog.Fatal(err)\n\t}\n\n\t// 发生消息\n\terr = channelConn.Publish(\n\t\t"",\n\t\tqueue.Name,\n\t\tfalse,\n\t\tfalse,\n\t\tamqp.Publishing{\n\t\t\tContentType: "text/plain",\n\t\t\tBody: []byte(`test002`),\n\t\t},\n\t)\n\tif err != nil {\n\t\tlog.Fatal(err)\n\t}\n\tfmt.Println("简单生产一条消息成功")\n}
4.用最简单的方式:消费者读取消息
对连接mq 简单封装
package AppInit\n\nimport (\n\t"fmt"\n\t"github.com/streadway/amqp"\n\t"log"\n)\n\nvar (\n\terr error\n\tMQConn *amqp.Connection\n)\n\nfunc init() {\n\tdsn := fmt.Sprintf("amqp://%s:%s@%s:%d","linzl","123","192.168.1.6",5672)\n\tMQConn, err = amqp.Dial(dsn)\n\tif err != nil {\n\t\tlog.Fatal(err)\n\t}\n\n\tlog.Println(MQConn.Major)\n}\n\nfunc GetMQ()*amqp.Connection {\n\treturn MQConn\n}
消费者
package main\n\nimport (\n\t"fmt"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/AppInit"\n\t"log"\n)\n\nfunc main() {\n\tmq := AppInit.GetMQ()\n\tdefer mq.Close()\n\n\tchannelConn, err := mq.Channel()\n\tif err != nil {\n\t\tlog.Fatal(err)\n\t}\n\tdefer channelConn.Close()\n\n\t// 消费\n\tmsgs, err := channelConn.Consume(\n\t\t"test_queue",\n\t\t"Consumer01",\n\t\tfalse,\n\t\tfalse,\n\t\tfalse,\n\t\tfalse,\n\t\tnil,\n\t)\n\tif err != nil {\n\t\tlog.Fatal(err)\n\t}\n\tfor msg := range msgs {\n\t\tmsg.Ack(false) // 确认\n\t\tfmt.Println(msg.DeliveryTag,string(msg.Body))\n\t}\n\n}
5.简单API过程、注册流程、MQ操作简单封装
案例用户注册
简单的API过程、注册流程、MQ操作简单封装
gin 框架构建用户注册api
userModel
package User\n\ntype UserModel struct {\n\tUserID int64 `json:"user_id"`\n\tUserName string `json:"user_name"`\n}\n\nfunc NewUserModel() *UserModel {\n\treturn &UserModel{}\n}
user api
package main\n\nimport (\n\t"encoding/json"\n\t"github.com/gin-gonic/gin"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Lib"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/UserReg/Models/User"\n\t"net/http"\n\t"time"\n)\n\nfunc main() {\n\n\tengine := gin.New()\n\tengine.Handle(http.MethodPost,"/user", func(ctx *gin.Context) {\n\t\tuser := User.NewUserModel()\n\t\terr := ctx.ShouldBindJSON(user)\n\t\tif err != nil {\n\t\t\tctx.JSON(400,err.Error())\n\t\t\treturn\n\t\t}\n\t\tuser.UserID =time.Now().Unix() // 模拟用户注册入库\n\t\tif user.UserID > 0 { // 假设入库成功\n\t\t\tbytes, _ := json.Marshal(user)\n\t\t\tLib.NewMQ().SendMessage(Lib.QUEUE_NEWUSER,string(bytes))\n\t\t}\n\t\tctx.JSON(200,user)\n\t})\n\n\tengine.Run(":6060")\n}
mq 操作简单封装
package Lib\n\nimport (\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/AppInit"\n\t"github.com/streadway/amqp"\n\t"log"\n)\n\nconst (\n\t// 用户注册队列名称\n\tQUEUE_NEWUSER = "newuser"\n)\n\ntype MQ struct {\n\tChannel *amqp.Channel\n}\n\nfunc NewMQ() *MQ {\n\tchannel, err := AppInit.GetMQ().Channel()\n\tif err != nil {\n\t\tlog.Println(err)\n\t}\n\treturn &MQ{Channel: channel}\n}\n\n// SendMessage 发生消息到mq\nfunc (this *MQ) SendMessage(queueName string, message string) error {\n\t_, err := this.Channel.QueueDeclare(queueName, false, false,\n\t\tfalse, false, nil)\n\tif err != nil {\n\t\treturn err\n\t}\n\treturn this.Channel.Publish("", queueName, false, false,\n\t\tamqp.Publishing{\n\t\t\tContentType: "text/plain",\n\t\t\tBody: []byte(message),\n\t\t})\n}
6.定义交换机:向2个队列同时发送消息(QueueBind)
Exchange
Direct Exchange 也叫做直接模式交换机。交换机和和一个队列绑定起来,并指定路由键, 交换机会寻找匹配的路由键的绑定,并将消息路由给对应的队列
package Lib\n\nimport (\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/AppInit"\n\t"github.com/streadway/amqp"\n\t"log"\n)\n\nconst (\n\t// 用户注册队列名称\n\tQUEUE_NEWUSER = "newuser"\n)\n\ntype MQ struct {\n\tChannel *amqp.Channel\n}\n\nfunc NewMQ() *MQ {\n\tchannel, err := AppInit.GetMQ().Channel()\n\tif err != nil {\n\t\tlog.Println(err)\n\t}\n\treturn &MQ{Channel: channel}\n}\n\n// SendMessage 发生消息到mq\nfunc (this *MQ) SendMessage(queueName string, message string) error {\n\tqueue1, err := this.Channel.QueueDeclare(queueName, false, false,\n\t\tfalse, false, nil)\n\tif err != nil {\n\t\treturn err\n\t}\n\t// 假设是其他业务方用的队列\n\tqueue2, err := this.Channel.QueueDeclare(queueName+"other", false, false,\n\t\tfalse, false, nil)\n\tif err != nil {\n\t\treturn err\n\t}\n\n\t// 声明一个交换机\n\terr = this.Channel.ExchangeDeclare("UserExchange", "direct",\n\t\tfalse, false, false, false, nil)\n\tif err != nil {\n\t\treturn err\n\t}\n\n\t// 队列1与交换机绑定\n\terr = this.Channel.QueueBind(queue1.Name, "UserReg",\n\t\t"UserExchange", false, nil)\n\tif err != nil {\n\t\treturn err\n\t}\n\t// 队列2与交换机绑定\n\terr = this.Channel.QueueBind(queue2.Name, "UserReg",\n\t\t"UserExchange", false, nil)\n\tif err != nil {\n\t\treturn err\n\t}\n\n\treturn this.Channel.Publish("UserExchange", "UserReg", false, false,\n\t\tamqp.Publishing{\n\t\t\tContentType: "text/plain",\n\t\t\tBody: []byte(message),\n\t\t})\n}
7.整理和调整代码结构、初始化队列等
初始化队列
go-rabbitmq/Lib/QueueInit.go
package Lib\n\nimport "fmt"\n\n// UserQueueInit 用户队列初始化..\nfunc UserQueueInit() error{\n\tmq := NewMQ()\n\tif mq == nil {\n\t\treturn fmt.Errorf("mq init err")\n\t}\n\tdefer mq.Channel.Close()\n\n\t// 声明交换机\n\terr := mq.Channel.ExchangeDeclare(USER_EXCHANGE, "direct", false,\n\t\tfalse, false, false, nil)\n\tif err != nil {\n\t\treturn fmt.Errorf("Exchange error:%s",err.Error())\n\t}\n\t// 声明队列及绑定\n\tqueues := fmt.Sprintf("%s,%s",QUEUE_NEWUSER,QUEUE_NEWUSER_OTHER01)\n\terr = mq.DecQueuueAndBind(queues, USER_EXCHANGE, USER_REG_ROUTER_KEY)\n\tif err != nil {\n\t\treturn fmt.Errorf("DecQueuueAndBind error:%s",err.Error())\n\t}\n\treturn nil\n}
SendMessage改造
package Lib\n\nimport (\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/AppInit"\n\t"github.com/streadway/amqp"\n\t"log"\n\t"strings"\n)\n\nconst (\n\t// 用户注册队列名称\n\tQUEUE_NEWUSER = "newuser"\n\t// 其他业务的新用户队列\n\tQUEUE_NEWUSER_OTHER01 = "newuser-other01"\n\t// 用户业务交换机\n\tUSER_EXCHANGE = "exchange-user"\n\t// 用户注册路由key\n\tUSER_REG_ROUTER_KEY = "router-key-userreg"\n)\n\ntype MQ struct {\n\tChannel *amqp.Channel\n}\n\nfunc NewMQ() *MQ {\n\tchannel, err := AppInit.GetMQ().Channel()\n\tif err != nil {\n\t\tlog.Println(err)\n\t}\n\treturn &MQ{Channel: channel}\n}\n// DecQueuueAndBind 声明队列及绑定队列,多个队列用逗号隔开\nfunc (this *MQ)DecQueuueAndBind(queues string,exchange string,routerKey string) error {\n\tqueueList := strings.Split(queues,",")\n\tfor _,queue := range queueList {\n\t\t// 声明队列\n\t\tq, err := this.Channel.QueueDeclare(queue, false, false,\n\t\t\tfalse, false, nil)\n\t\tif err != nil {\n\t\t\treturn err\n\t\t}\n\t\t// 绑定交换机和路由key\n\t\terr = this.Channel.QueueBind(q.Name, routerKey, exchange, false, nil)\n\t\tif err != nil {\n\t\t\treturn err\n\t\t}\n\t}\n\treturn nil\n}\n\n// SendMessage 发生消息到mq\nfunc (this *MQ) SendMessage(key string, exchange string,message string) error {\n\treturn this.Channel.Publish(exchange, key, false, false,\n\t\tamqp.Publishing{\n\t\t\tContentType: "text/plain",\n\t\t\tBody: []byte(message),\n\t\t})\n}
拉起gin框架时初始化队列
package main\n\nimport (\n\t"context"\n\t"encoding/json"\n\t"fmt"\n\t"github.com/gin-gonic/gin"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Lib"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/UserReg/Models/User"\n\t"log"\n\t"net/http"\n\t"os"\n\t"os/signal"\n\t"syscall"\n\t"time"\n)\n\nfunc main() {\n\terrchan := make(chan error)\n\tengine := gin.Default()\n\tengine.Handle(http.MethodPost,"/user", func(ctx *gin.Context) {\n\t\tuser := User.NewUserModel()\n\t\terr := ctx.ShouldBindJSON(user)\n\t\tif err != nil {\n\t\t\tctx.JSON(400,err.Error())\n\t\t\treturn\n\t\t}\n\t\tuser.UserID =time.Now().Unix() // 模拟用户注册入库\n\t\tif user.UserID > 0 { // 假设入库成功\n\t\t\tbytes, _ := json.Marshal(user)\n\t\t\tmq := Lib.NewMQ()\n\t\t\terr := mq.SendMessage(Lib.USER_REG_ROUTER_KEY, Lib.USER_EXCHANGE, string(bytes))\n\t\t\tif err != nil {\n\t\t\t\tlog.Println(err)\n\t\t\t\terrchan
8.客户端消费注册用户消息、确认消息
模拟消费
接收消息
模拟发生邮件
ack 确认
消费
package main\n\nimport (\n\t"encoding/json"\n\t"fmt"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Lib"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/UserReg/Models/User"\n\t"github.com/streadway/amqp"\n\t"time"\n)\n\nfunc SendMail(msgs
MQ.go 新增Consume方法
package Lib\n\nimport (\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/AppInit"\n\t"github.com/streadway/amqp"\n\t"log"\n\t"strings"\n)\n\nconst (\n\t// 用户注册队列名称\n\tQUEUE_NEWUSER = "newuser"\n\t// 其他业务的新用户队列\n\tQUEUE_NEWUSER_OTHER01 = "newuser-other01"\n\t// 用户业务交换机\n\tUSER_EXCHANGE = "exchange-user"\n\t// 用户注册路由key\n\tUSER_REG_ROUTER_KEY = "router-key-userreg"\n)\n\ntype MQ struct {\n\tChannel *amqp.Channel\n}\n\nfunc NewMQ() *MQ {\n\tchannel, err := AppInit.GetMQ().Channel()\n\tif err != nil {\n\t\tlog.Println(err)\n\t}\n\treturn &MQ{Channel: channel}\n}\n// DecQueuueAndBind 声明队列及绑定队列,多个队列用逗号隔开\nfunc (this *MQ)DecQueuueAndBind(queues string,exchange string,routerKey string) error {\n\tqueueList := strings.Split(queues,",")\n\tfor _,queue := range queueList {\n\t\t// 声明队列\n\t\tq, err := this.Channel.QueueDeclare(queue, false, false,\n\t\t\tfalse, false, nil)\n\t\tif err != nil {\n\t\t\treturn err\n\t\t}\n\t\t// 绑定交换机和路由key\n\t\terr = this.Channel.QueueBind(q.Name, routerKey, exchange, false, nil)\n\t\tif err != nil {\n\t\t\treturn err\n\t\t}\n\t}\n\treturn nil\n}\n\n// SendMessage 发生消息到mq\nfunc (this *MQ) SendMessage(key string, exchange string,message string) error {\n\treturn this.Channel.Publish(exchange, key, false, false,\n\t\tamqp.Publishing{\n\t\t\tContentType: "text/plain",\n\t\t\tBody: []byte(message),\n\t\t})\n}\n\nfunc (this *MQ)Consume(queueName string, key string,callback func(
9. 多消费者消费消息、重新入列
消费者改造支持多消费者
package main\n\nimport (\n\t"encoding/json"\n\t"flag"\n\t"fmt"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Lib"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/UserReg/Models/User"\n\t"github.com/streadway/amqp"\n\t"log"\n\t"time"\n)\n\nfunc SendMail(msgs
10.消费者限流:ACK后再收新消息
代码改造一下,使用协程消费
package main\n\nimport (\n\t"encoding/json"\n\t"flag"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Lib"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/UserReg/Models/User"\n\t"github.com/streadway/amqp"\n\t"log"\n\t"time"\n)\n\nfunc Send(c string, msg amqp.Delivery) error {\n\ttime.time.Second * 3)\n\tuserModel := &User.UserModel{}\n\tjson.Unmarshal(msg.Body, userModel)\n\tlog.Printf("消费者:%s,向userid=%d的用户发生邮件n",c,userModel.UserID)\n\tmsg.Ack(false)\n\treturn nil\n}\n\nfunc SendMail(msgs
消费者限流
mq.Channel.Qos(2, 0, false) 第一个参数prefetchCount 可以限制,当接受prefetchCount 条消息后
只有ack 之后可以继续接收消费下一条消息,起到保护消费者的作用
mq.Channel.Qos(2, 0, false)
11. 开启模式、记录失败的消息
当生产消息时,由于mq 的网络问题或是其他问题,可能出现发送失败的情况
当有些敏感信息又不能失败,需要确保每一条消息都发送成功
因此mq 有一个机制就是可以开发 模式,当给mq发送消息时,如果成功会有一个ack 回执
ack 成功说明发送消息成功,ack 失败时需要记录日志(写到mysql 或redis)什么的进行重发
第一步
开启模式
// SetConfirm 设置模式\nfunc (this *MQ)Setconfirm() {\n\terr := this.Channel.confirm(false)\n\tif err != nil {\n\t\tlog.Println(err)\n\t}\n}
在MQ 的结构体添加属性
type MQ struct {\n\tChannel *amqp.Channel\n\tnotifyConfirm chan amqp.Confirmation\n}
// SetConfirm 设置模式\nfunc (this *MQ)Setconfirm() {\n\terr := this.Channel.confirm(false)\n\tif err != nil {\n\t\tlog.Println(err)\n\t}\n\tthis.notifyConfirm = this.Channel.NotifyPublish(make(chan amqp.Confirmation))\n}
发生消息后,当服务器确认此chan 会有数据传输过来
发生消息时调用SetConfirm 开启 模式 完整代码
MQ.go
package Lib\n\nimport (\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/AppInit"\n\t"github.com/streadway/amqp"\n\t"log"\n\t"strings"\n)\n\nconst (\n\t// 用户注册队列名称\n\tQUEUE_NEWUSER = "newuser"\n\t// 其他业务的新用户队列\n\tQUEUE_NEWUSER_OTHER01 = "newuser-other01"\n\t// 用户业务交换机\n\tUSER_EXCHANGE = "exchange-user"\n\t// 用户注册路由key\n\tUSER_REG_ROUTER_KEY = "router-key-userreg"\n)\n\ntype MQ struct {\n\tChannel *amqp.Channel\n\tnotifyConfirm chan amqp.Confirmation\n}\n\nfunc NewMQ() *MQ {\n\tchannel, err := AppInit.GetMQ().Channel()\n\tif err != nil {\n\t\tlog.Println(err)\n\t}\n\treturn &MQ{Channel: channel}\n}\n// DecQueuueAndBind 声明队列及绑定队列,多个队列用逗号隔开\nfunc (this *MQ)DecQueuueAndBind(queues string,exchange string,routerKey string) error {\n\tqueueList := strings.Split(queues,",")\n\tfor _,queue := range queueList {\n\t\t// 声明队列\n\t\tq, err := this.Channel.QueueDeclare(queue, false, false,\n\t\t\tfalse, false, nil)\n\t\tif err != nil {\n\t\t\treturn err\n\t\t}\n\t\t// 绑定交换机和路由key\n\t\terr = this.Channel.QueueBind(q.Name, routerKey, exchange, false, nil)\n\t\tif err != nil {\n\t\t\treturn err\n\t\t}\n\t}\n\treturn nil\n}\n\n// SetConfirm 设置模式\nfunc (this *MQ)Setconfirm() {\n\terr := this.Channel.confirm(false)\n\tif err != nil {\n\t\tlog.Println(err)\n\t}\n\tthis.notifyConfirm = this.Channel.NotifyPublish(make(chan amqp.Confirmation))\n\tgo this.Listenconfirm()\n}\n// ListenConfirm 监听消息\nfunc (this *MQ)Listenconfirm() {\n\tdefer this.Channel.Close()\n\tret :=
生产者
package main\n\nimport (\n\t"context"\n\t"encoding/json"\n\t"fmt"\n\t"github.com/gin-gonic/gin"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Lib"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/UserReg/Models/User"\n\t"log"\n\t"net/http"\n\t"os"\n\t"os/signal"\n\t"syscall"\n\t"time"\n)\n\nfunc main() {\n\terrchan := make(chan error)\n\tengine := gin.Default()\n\tengine.Handle(http.MethodPost,"/user", func(ctx *gin.Context) {\n\t\tuser := User.NewUserModel()\n\t\terr := ctx.ShouldBindJSON(user)\n\t\tif err != nil {\n\t\t\tctx.JSON(400,err.Error())\n\t\t\treturn\n\t\t}\n\t\tuser.UserID =time.Now().Unix() // 模拟用户注册入库\n\t\tif user.UserID > 0 { // 假设入库成功\n\t\t\tbytes, _ := json.Marshal(user)\n\t\t\tmq := Lib.NewMQ()\n\t\t\t// 开启 模式\n\t\t\tmq.Setconfirm()\n\t\t\terr := mq.SendMessage(Lib.USER_REG_ROUTER_KEY, Lib.USER_EXCHANGE, string(bytes))\n\t\t\tif err != nil {\n\t\t\t\tlog.Println(err)\n\t\t\t\terrchan
12.监听消息入列回执:NotifyReturn的用法
mandatory参数
如果为true,在exchange正常且可以到达的情况下。
如果exchange+routeKey 无法投递给queue,那么MQ会将消息还给生产者
如果为false,则直接丢弃
模拟无法投递到exchange+routeKey 通过rabbitmq 管理后台,手动解绑(写多个队列的需要全部解绑)
package Lib\n\nimport (\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/AppInit"\n\t"github.com/streadway/amqp"\n\t"log"\n\t"strings"\n)\n\nconst (\n\t// 用户注册队列名称\n\tQUEUE_NEWUSER = "newuser"\n\t// 其他业务的新用户队列\n\tQUEUE_NEWUSER_OTHER01 = "newuser-other01"\n\t// 用户业务交换机\n\tUSER_EXCHANGE = "exchange-user"\n\t// 用户注册路由key\n\tUSER_REG_ROUTER_KEY = "router-key-userreg"\n)\n\ntype MQ struct {\n\tChannel *amqp.Channel\n\tnotifyConfirm chan amqp.Confirmation\n\n\t// NotifyReturn的用法\n\tnotifyReturn chan amqp.Return\n}\n\nfunc NewMQ() *MQ {\n\tchannel, err := AppInit.GetMQ().Channel()\n\tif err != nil {\n\t\tlog.Println(err)\n\t}\n\treturn &MQ{Channel: channel}\n}\n// DecQueuueAndBind 声明队列及绑定队列,多个队列用逗号隔开\nfunc (this *MQ)DecQueuueAndBind(queues string,exchange string,routerKey string) error {\n\tqueueList := strings.Split(queues,",")\n\tfor _,queue := range queueList {\n\t\t// 声明队列\n\t\tq, err := this.Channel.QueueDeclare(queue, false, false,\n\t\t\tfalse, false, nil)\n\t\tif err != nil {\n\t\t\treturn err\n\t\t}\n\t\t// 绑定交换机和路由key\n\t\terr = this.Channel.QueueBind(q.Name, routerKey, exchange, false, nil)\n\t\tif err != nil {\n\t\t\treturn err\n\t\t}\n\t}\n\treturn nil\n}\n\n// SetConfirm 设置模式\nfunc (this *MQ)Setconfirm() {\n\terr := this.Channel.confirm(false)\n\tif err != nil {\n\t\tlog.Println(err)\n\t}\n\tthis.notifyConfirm = this.Channel.NotifyPublish(make(chan amqp.Confirmation))\n\tgo this.Listenconfirm()\n}\n// ListenConfirm 监听消息\nfunc (this *MQ)Listenconfirm() {\n\tdefer this.Channel.Close()\n\tret :=
SendMessage 之前需要 NotifyReturn
package main\n\nimport (\n\t"context"\n\t"encoding/json"\n\t"fmt"\n\t"github.com/gin-gonic/gin"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Lib"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/UserReg/Models/User"\n\t"log"\n\t"net/http"\n\t"os"\n\t"os/signal"\n\t"syscall"\n\t"time"\n)\n\nfunc main() {\n\terrchan := make(chan error)\n\tengine := gin.Default()\n\tengine.Handle(http.MethodPost,"/user", func(ctx *gin.Context) {\n\t\tuser := User.NewUserModel()\n\t\terr := ctx.ShouldBindJSON(user)\n\t\tif err != nil {\n\t\t\tctx.JSON(400,err.Error())\n\t\t\treturn\n\t\t}\n\t\tuser.UserID =time.Now().Unix() // 模拟用户注册入库\n\t\tif user.UserID > 0 { // 假设入库成功\n\t\t\tbytes, _ := json.Marshal(user)\n\t\t\tmq := Lib.NewMQ()\n\t\t\t// 开启 模式\n\t\t\tmq.Setconfirm()\n\n\t\t\t// 监听return 需要在发送消息之前\n\t\t\tmq.NotifyReturn()\n\t\t\terr := mq.SendMessage(Lib.USER_REG_ROUTER_KEY, Lib.USER_EXCHANGE, string(bytes))\n\t\t\tif err != nil {\n\t\t\t\tlog.Println(err)\n\t\t\t\terrchan
13. 以用户注册为例产生的事务需求、延迟队列使用
基本实现
生产者注册成功之后发生消息
消息者接受消息后,调用邮件服务
调用失败。重新入列(要加个延迟时间,失败次数越多,延迟时间越长)
超过最大重试次数。就不发邮件了
安装插件
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
这是一个延迟交换机插件。省去我们自己写规则的麻烦
由于我们使用的是3.8.10。因此使用3.8.10对应的插件
拷贝plugins中,容器对应的目录是/opt/rabbitmq/plugins
docker cp rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez my-rmq:/opt/rabbitmq/plugins
启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
延迟队列使用
官方文档:
// ... elided code ...\nMap args = new HashMap();\nargs.put("x-delayed-type", "direct");\nchannel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);\n// ... more code ...
// ... elided code ...\nbyte[] messageBodyBytes = "delayed payload".getBytes("UTF-8");\nMap headers = new HashMap();\nheaders.put("x-delay", 5000);\nAMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);\nchannel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);\n\nbyte[] messageBodyBytes2 = "more delayed payload".getBytes("UTF-8");\nMap headers2 = new HashMap();\nheaders2.put("x-delay", 1000);\nAMQP.BasicProperties.Builder props2 = new AMQP.BasicProperties.Builder().headers(headers2);\nchannel.basicPublish("my-exchange", "", props2.build(), messageBodyBytes2);\n// ... more code ...
因此go 代码改动
定义交互机的kind 应该使用x-delayed-message,args 用map[string]interface{}{"x-delayed-type":"direct"}
// UserDelayInit 创建用户延迟交换机\nfunc UserDelayInit() error {\n\tmq := NewMQ()\n\tif mq == nil {\n\t\treturn fmt.Errorf("UserDelayInit init error")\n\t}\n\tdefer mq.Channel.Close()\n\t// 声明交换机\n\terr := mq.Channel.ExchangeDeclare(USER_EXCHANGE_DELAY, "x-delayed-message", false, false,\n\t\tfalse, false, map[string]interface{}{"x-delayed-type":"direct"})\n\tif err != nil {\n\t\treturn fmt.Errorf("UserDelayInit ExchangeDeclare error")\n\t}\n\t// 声明队列名称及绑定\n\tqueues := fmt.Sprintf("%s",QUEUE_NEWUSER)\n\terr = mq.DecQueuueAndBind(queues, USER_EXCHANGE_DELAY, USER_REG_ROUTER_KEY)\n\tif err != nil {\n\t\treturn fmt.Errorf("DecQueuueAndBind error:%s",err.Error())\n\t}\n\treturn nil\n}
发送消息时需要设置Headers: map[string]interface{}{"x-delay":delay}, // 单位毫秒
// SendDelayMessage 发生延迟消息到mq\n// delay 单位是ms\nfunc (this *MQ) SendDelayMessage(key string, exchange string,message string,delay int) error {\n\treturn this.Channel.Publish(exchange, key, true, false,\n\t\tamqp.Publishing{\n\t\t\tHeaders: map[string]interface{}{"x-delay":delay}, // 单位毫秒\n\t\t\tContentType: "text/plain",\n\t\t\tBody: []byte(message),\n\t\t})\n}
生产部分
go func() {\n err := Lib.UserQueueInit()\n if err !=nil {\n errchan
消费者不需要调整
消息者接收消息时会延迟delay 毫秒后收到
14. 记录消费者调用失败次数、逼格SQL技巧
首先建表 user_notify
`user_notify` (\n `user_id` int(11) NOT NULL,\n `notify_num` int(11) NOT NULL DEFAULT '1',\n `is_done` int(11) NOT NULL DEFAULT '0',\n `updatetime` datetime NOT NULL,\n PRIMARY KEY (`user_id`)\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
一旦调用邮件服务失败,写入这个表
使用MySQL库
扩展库:https://github.com/jmoiron/sqlx
安装
go get -u github.com/jmoiron/sqlx
mysql驱动:https://github.com/go-sql-driver/mysql
安装
go get -u github.com/go-sql-driver/mysql
mysql记录常规做法(伪代码)
开启事物\n 取出该条记录。\n if 没有\n insert info ...\n else \n if notify_num >=5 // 失败5次,程序里订阅\n \n\t\t\tuser_money=user_money-:money \n\t\t\twhere user_name=:from =:money`\n\n\tresult, err := tx.NamedExec(sql1, tm)\n\n\tif err != nil {\n\t\treturn err\n\t}\n\taffected, err := result.RowsAffected()\n\tif err != nil {\n\t\treturn err\n\t}\n\t// 受影响行为0 代表木有扣款\n\tif affected == 0 {\n\t\terr = tx.Rollback() // 回滚\n\t\tif err != nil {\n\t\t\treturn err\n\t\t}\n\t\treturn fmt.Errorf("扣款失败1111")\n\t}\n\tsql2 := "(:from,:to,:money,NOW())"\n\n\t// 写日志表\n\tresult, err = tx.NamedExec(sql2, tm)\n\tif err != nil {\n\t\terr2 := tx.Rollback() // 回滚\n\t\tif err2 != nil {\n\t\t\treturn err\n\t\t}\n\t\treturn fmt.Errorf("扣款失败2222:%s",err.Error())\n\t}\n\taffected, err = result.RowsAffected()\n\tif err != nil {\n\t\terr = tx.Rollback() // 回滚\n\t\tif err != nil {\n\t\t\treturn err\n\t\t}\n\t\treturn fmt.Errorf("扣款失败3333",err.Error())\n\t}\n\t// 受影响行为0 代表木有扣款\n\tif affected == 0 {\n\t\terr = tx.Rollback() // 回滚\n\t\tif err != nil {\n\t\t\treturn err\n\t\t}\n\t\treturn fmt.Errorf("扣款失败,写日志表出错")\n\t}\n\ttx.Commit()\n\treturn nil\n}
a公司实际扣款操作
package main\n\nimport (\n\t"fmt"\n\t"github.com/gin-gonic/gin"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Trans"\n\t"log"\n\t"net/http"\n\t"os"\n\t"os/signal"\n\t"syscall"\n)\n\nfunc main() {\n\tengine := gin.Default()\n\tengine.Use(Trans.HandleErr())\n\tengine.POST("/", func(ctx *gin.Context) {\n\t\ttransModel := Trans.NewTransModel()\n\t\terr := ctx.ShouldBindJSON(&transModel)\n\t\tTrans.CheckErr(err,"ShouldBindJSON error:")\n\n\t\t// 执行转账\n\t\terr = Trans.TransMoney(transModel)\n\t\tTrans.CheckErr(err,"TransMoney error:")\n\t\tctx.JSON(200,gin.H{"result":transModel.String()})\n\n\t})\n\terrChan := make(chan error)\n\tserver := http.Server{\n\t\tAddr: ":6060",\n\t\tHandler: engine,\n\t}\n\tgo func() {\n\t\terr := server.ListenAndServe()\n\t\tif err != nil {\n\t\t\terrChan
18.A公司转账业务逻辑:记录日志后发送消息到mq
初始化mq 队列
func TransInit() error {\n\tmq := NewMQ()\n\tif mq == nil {\n\t\treturn fmt.Errorf("UserDelayInit init error")\n\t}\n\tdefer mq.Channel.Close()\n\terr := mq.Channel.ExchangeDeclare(TRANS_EXCHANGE, "direct", false,\n\t\tfalse, false, false, nil)\n\tif err != nil {\n\t\treturn fmt.Errorf("mq.Channel.ExchangeDeclare TRANS_EXCHANGE error:%s",err.Error())\n\t}\n\terr = mq.DecQueuueAndBind(TRANS_QUEUE, TRANS_EXCHANGE, TRANS_ROUTER_KEY)\n\tif err != nil {\n\t\treturn fmt.Errorf("trans DecQueuueAndBind error:%s",err.Error())\n\t}\n\treturn nil\n}
拉起框架时,起协程拉起mq
// 初始化队列\n\tgo func() {\n\t\terr := Lib.TransInit()\n\t\tif err != nil {\n\t\t\terrChan
发送消息到mq
engine.POST("/", func(ctx *gin.Context) {\n transModel := Trans.NewTransModel()\n err := ctx.ShouldBindJSON(&transModel)\n Trans.CheckErr(err,"ShouldBindJSON error:")\n\n // 执行转账\n err = Trans.TransMoney(transModel)\n Trans.CheckErr(err,"TransMoney error:")\n\n // 写mq\n mq := Lib.NewMQ()\n jsonBytes, _ := json.Marshal(transModel)\n err = mq.SendMessage(Lib.TRANS_ROUTER_KEY, Lib.TRANS_EXCHANGE, string(jsonBytes))\n Trans.CheckErr(err,"发送消息队列失败了")\n ctx.JSON(200,gin.H{"result":transModel.String()})\n\n})
19.A公司转账业务逻辑:定时”无脑”补偿机制(上)
不管发送成功与否
思路如下: 1、我们写个 “死循环”程序
2、定时取5秒或自定义秒内 :status==0 的数据,再发一次消息
3、设定定时任务。定时清理20秒内(或自定义)status==0的消息,把它改为status=2
定时任务 第三方库
看这里 https://github.com/robfig/cron
安装: go get github.com/robfig/cron/[email protected]
https://www.jtthink.com/course/play/2461
定时任务补偿代码
package main\n\nimport (\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Trans"\n\t"github.com/robfig/cron/v3"\n\t"log"\n)\n\nconst failSql = ` STATUS=2 where \n\t\tTIMESTAMPDIFF(SECOND,updatetime,now())>30 >2`\n\n\nvar MyCron *cron.Cron\n\nfunc CronInit()error {\n\tMyCron = cron.New(cron.WithSeconds())\n\t_, err := MyCron.AddFunc("0/3 * * * * *", FailTransLog)\n\treturn err\n}\n\n// 定时取消订单\nfunc FailTransLog() {\n\t_, err := Trans.GetDB().(failSql)\n\tif err != nil {\n\t\tlog.Println(err)\n\t}\n\tlog.Println("更新成功")\n}\n\nfunc main() {\n\terrChan := make(chan error)\n\tgo func() {\n\t\terr := Trans.InitDB("a")\n\t\tif err != nil {\n\t\t\terrChan
20.A公司转账逻辑: 补偿机制之交易失败后“还钱 ”
两个任务
取消交易
STATUS=2 where TIMESTAMPDIFF(SECOND,updatetime,now())>20 >2
_, err := MyCron.AddFunc("0/3 * * * * *", FailTransLog)\n\tif err != nil {\n\t\treturn err\n\t}
还钱
`,money from `translog` where `status`=2 0 limit 10
// 还钱\n_, err = MyCron.AddFunc("0/4 * * * * *", BackMoney)\nif err != nil {\n return err\n}
SQL
首先加个字段: isback ,money from translog where status=2 0 limit 10
这里面为了防止 数据不一致,都要依赖数据库事务
做个统一的事务提交
func clearTx(tx *sqlx.Tx) {\n\terr := tx.Commit()\n\tif err != nil && err != sql.ErrTxDone {\n\t\tlog.Println("tx err",err)\n\t}\n\tislock = false\n}
全部代码
package main\n\nimport (\n\t"context"\n\t"database/sql"\n\t"github.com/jmoiron/sqlx"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Trans"\n\t"github.com/robfig/cron/v3"\n\t"log"\n\t"time"\n)\n\n// 取消订单\nconst failSql = ` STATUS=2 where \n\t\tTIMESTAMPDIFF(SECOND,updatetime,now())>30 >2`\n\n// 从日志表里取出status=2 并且isback=0 的进行还钱操作\nconst backSql = "`,money from `translog` where `status`=2 0 limit 10"\n\n// 锁 防止上一个任务没执行完,下一个任务又开始了,产生脏读(只适用于单线程,通过变量控制锁)\nvar islock = false\n\nfunc clearTx(tx *sqlx.Tx) {\n\terr := tx.Commit()\n\tif err != nil && err != sql.ErrTxDone {\n\t\tlog.Println("tx err",err)\n\t}\n\tislock = false\n}\n// 还钱\nfunc BackMoney() {\n\tif islock {\n\t\tlog.Println("已经锁住了")\n\t\treturn\n\t}\n\ttxx, err := Trans.GetDB().BeginTxx(context.Background(), nil)\n\n\tif err != nil {\n\t\tlog.Println("事务失败",err)\n\t\treturn\n\t}\n\tislock = true // 加锁\n\tdefer clearTx(txx) // 清理事物\n\ttime.time.Second * 8)\n\n\trows, err := txx.Queryx(backSql)\n\tif err != nil {\n\t\tlog.Println("Queryx err:",err)\n\t\ttxx.Rollback()\n\t}\n\n\tdefer rows.Close()\n\n\ttransModels := []Trans.TransModel{}\n\terr = sqlx.StructScan(rows, &transModels)\n\tif err != nil {\n\t\tlog.Println("StructScan err:",err)\n\t\ttxx.Rollback()\n\t}\n\n\t// 还钱操作\n\tfor _, row := range transModels {\n\t\t_, err = txx.(" user_money=user_money+? where user_name=?",\n\t\t\trow.Money, row.From)\n\t\tif err !=nil {\n\t\t\ttxx.Rollback()\n\t\t}\n\t\t_, err = txx.(" isback=1 where tid=?",row.Tid)\n\t\tif err !=nil {\n\t\t\ttxx.Rollback()\n\t\t}\n\t}\n}\n\n\nvar MyCron *cron.Cron\n\nfunc CronInit()error {\n\tMyCron = cron.New(cron.WithSeconds())\n\t_, err := MyCron.AddFunc("0/3 * * * * *", FailTransLog)\n\tif err != nil {\n\t\treturn err\n\t}\n\t// 还钱\n\t_, err = MyCron.AddFunc("0/4 * * * * *", BackMoney)\n\tif err != nil {\n\t\treturn err\n\t}\n\treturn err\n}\n\n// 定时取消订单\nfunc FailTransLog() {\n\t_, err := Trans.GetDB().(failSql)\n\tif err != nil {\n\t\tlog.Println(err)\n\t}\n\tlog.Println("更新成功")\n}\n\nfunc main() {\n\terrChan := make(chan error)\n\tgo func() {\n\t\terr := Trans.InitDB("a")\n\t\tif err != nil {\n\t\t\terrChan
21.补偿机制之重发MQ消息、B公司记录日志
今天完成的任务是
取出交易时间在8秒内,且status=0的数据,进行MQ 重发
1、SQL如下 translog where TIMESTAMPDIFF(SECOND,updatetime,now())<=8 0
2、定时器设置:为 每隔2秒 处理。
B公司日志表
和A公司一样。 不需要IsBack\n\ntid 注意 不需要自增
b公司消费代码
package main\n\nimport (\n\t"encoding/json"\n\t"flag"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Lib"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Trans"\n\t"github.com/streadway/amqp"\n\t"log"\n\t"fmt"\n)\n\nfunc saveLog(tm *Trans.TransModel, msg amqp.Delivery) {\n\tfmt.Println(tm.Tid,tm.From,tm.Money)\n\tsql := "(?,?,?,?,now())"\n\t_, err := Trans.GetDB().(sql, tm.Tid,tm.From,tm.To,tm.Money)\n\tif err != nil {\n\t\tlog.Println("1111",err)\n\t}\n\tmsg.Ack(false)\n}\n\nfunc myconsumer(messages
22.B公司业务逻辑:确认收钱
A和B 要约定个 回调地址(A是回调地址) http://localhost:8080/callback-----A
参数:tid
SQL status=1 where tid=? 0
A 公司回调接口
// 回调接口\nengine.POST("/callback", func(ctx *gin.Context) {\n tid := ctx.PostForm("tid")\n sql := " `status`=1 where tid=? 0"\n result, err := Trans.GetDB().(sql, tid)\n affected, err2 := result.RowsAffected()\n if err != nil || err2 != nil || affected != 1 {\n ctx.String(200,"error")\n } else {\n ctx.String(200,"success")\n }\n})
B公司使用mysql 事物保证日志及确认收钱及回调成功
B消费者 消费到记录后 执行两个过程 1) 插记录 2)把钱更新给用户 3) 回调接口 3步必须都成功。否则回滚数据库。
package main\n\nimport (\n\t"context"\n\t"database/sql"\n\t"encoding/json"\n\t"flag"\n\t"github.com/jmoiron/sqlx"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Lib"\n\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Trans"\n\t"github.com/streadway/amqp"\n\t"io/ioutil"\n\t"log"\n\t"fmt"\n\t"net/http"\n\t"strings"\n)\n\n\nfunc clearTx(tx *sqlx.Tx) {\n\terr := tx.Commit()\n\tif err != nil && err != sql.ErrTxDone {\n\t\tlog.Println("clearTx error",err)\n\t}\n}\n\n\nfunc saveLog(tm *Trans.TransModel, msg amqp.Delivery) {\n\tfmt.Println(tm.Tid,tm.From,tm.Money)\n\tsql := "(?,?,?,?,now())"\n\t_, err := Trans.GetDB().(sql, tm.Tid,tm.From,tm.To,tm.Money)\n\tif err != nil {\n\t\tlog.Println("1111",err)\n\t}\n\tmsg.Ack(false)\n}\n\nfunc saveLogWithTx(tm *Trans.TransModel, msg amqp.Delivery) {\n\ttxx, err := Trans.GetDB().BeginTxx(context.Background(), nil)\n\tif err != nil {\n\t\tlog.Println("BeginTxx error",err)\n\t\treturn\n\t}\n\n\tdefer clearTx(txx)\n\tsql := "(?,?,?,?,now())"\n\t_, err = txx.(sql, tm.Tid, tm.From, tm.To, tm.Money)\n\tif err != nil {\n\t\tlog.Println("(:order_no,:order_user,:order_time)", req)\n\t\t\tif err != nil {\n\t\t\t\tlog.Println(err)\n\t\t\t}\n\t\t}\n\t\tmsg.Ack(false)\n\t}\n}\n\n\nfunc main() {\n\tmq := Lib.NewMQ()\n\n\tdefer mq.Channel.Close()\n\n\terr := Trans.InitDB("a")\n\tif err != nil {\n\t\tlog.Fatal(err)\n\t}\n\n\tmq.Channel.Qos(2,0,false)\n\n\tmq.Consume(Lib.ORDER_QUEUE,"消费者1", saveOrder)\n}
查看文章精彩评论,请前往什么值得买进行阅读互动
","gnid":"98341c1f478db84bf","img_data":[{"flag":2,"img":[{"desc":"","height":"389","title":"","url":"https://p0.ssl.img.360kuai.com/t018d54c51ce390ac56.webp","width":"702"}]}],"original":0,"pat":"art_src_1,fts0,sts0","powerby":"pika","pub_time":1695891920000,"pure":"","rawurl":"http://zm.news.so.com/bab4d4d9485a5d4b43919ad4d896220e","redirect":0,"rptid":"526da860dcbebebb","rss_ext":[],"s":"t","src":"什么值得买","tag":[],"title":"go web+RabbitMQ实战速学
易贝盆942为什么我要放弃 RabbitMQ -
冶迹康17684295957 ______ 1. 需要“消息延迟”功能 这对我们来说是很重要的业务需求.当顾客订了一个服务,首先我们会发送描述相信指令的短信,然后我们会在两分钟后发送第二条描述详情的短信,而不是两条一起发送.我们希望通过这样,留给用户阅读的时间(...
易贝盆942rabbitmq适合做长连接吗 -
冶迹康17684295957 ______ 如果可以,最好不要用短链接,尤其是消费者端,因为会耗费很大cpu.而大量连接也会造成内存增多.
易贝盆942如何优雅的使用RabbitMQ -
冶迹康17684295957 ______ 按安装驱动程序,然后Fn配合无线网卡快捷键开启无线网卡,然后就可以用了.
易贝盆942如何查看rabbitmq是否启动了 -
冶迹康17684295957 ______ 可能是系统有问题了.直接换个验证过的系统盘重装系统就行了,这样就可以全程自动、顺利解决 win7系统中软件无法运行 的问题了.用u盘或者硬盘这些都是可以的,且安装速度非常快.但关键是:要有兼容性好的(兼容ide、achi、Raid模式的安装)
易贝盆942rabbitmq必须用root用户启动吗 -
冶迹康17684295957 ______ 需要吧
易贝盆942如何测试 rabbitmq 的性能 -
冶迹康17684295957 ______ 测试 rabbitmq 的性能方法如下:1、声明7个具有不同属性的queue,分别和名为test_exchage的exchange进行绑定(因为exchange为fanout类型,所以测试代码中的routing_key其实是不起作用的);2、向exchange发送具有persistent属性的消息(delivery_mode=2);3、创建7个消费者分别从上述7个queue中获取消息;4、测试结果如下
易贝盆942使用 RabbitMQ 为什么有时候会丢数据 -
冶迹康17684295957 ______ 一般情况下,是配置的原因,应该是你配置了自动确认,又写了代码进行手动确认.当你配置了自动确认时,调用basicConsume方法时rabbitmq服务端返回的确认码不是唯一的,会重复,所以又进行了手动确认,就容易导致消息丢失.如果你用的是java+spring,配置手动确认如下:其中,ackMessageListener是实现了ChannelAwareMessageListener接口的实现类实例.
易贝盆942rabbitmq消息真的可以持久化吗 -
冶迹康17684295957 ______ 消息传递的速度:用MSMQ/RabbitMq,等带持久化功能的队列组件;如果嫌太慢,就用ZeroMq(无消息持久化功能),但可以达到30W消息每秒;事件持久化的速度:由于事件都是.
易贝盆942rabbitmq如何去做流控测试 -
冶迹康17684295957 ______ rabbit mq失败后怎么执行下一个队列1、笨拙点方法,就是轮循,consume的阻塞监听可以设置timeout,通过设置一个较小的timeout,可以轮流监听几个channel,变相实现监听多个queue,对性能要求不是很高,可以使用这种方法2、还有个办...
易贝盆942rabbitmq 支持android或ios吗 -
冶迹康17684295957 ______ 您好,很高兴回答您的问题.Silverlight是建立在windows系统上的,而Android或ios不需要这个软件.这是由系统框架的差异和内核的不同造成的.b